HBASE-14920: Compacting memstore Signed-off-by: stack <st...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a27504c7 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a27504c7 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a27504c7 Branch: refs/heads/master Commit: a27504c70181ec3585033eaee2523184c40a144f Parents: af5146e Author: eshcar <esh...@yahoo-inc.com> Authored: Mon May 16 15:50:20 2016 +0300 Committer: stack <st...@apache.org> Committed: Fri May 20 03:41:43 2016 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hbase/HColumnDescriptor.java | 83 ++- .../test/IntegrationTestBigLinkedList.java | 4 +- .../hbase/regionserver/AbstractMemStore.java | 33 +- .../hbase/regionserver/CompactingMemStore.java | 406 +++++++++++ .../hbase/regionserver/CompactionPipeline.java | 190 +++++ .../hbase/regionserver/DefaultMemStore.java | 31 +- .../regionserver/FlushAllLargeStoresPolicy.java | 75 ++ .../regionserver/FlushLargeStoresPolicy.java | 54 +- .../FlushNonSloppyStoresFirstPolicy.java | 66 ++ .../hbase/regionserver/FlushPolicyFactory.java | 2 +- .../hadoop/hbase/regionserver/HMobStore.java | 4 + .../hadoop/hbase/regionserver/HRegion.java | 70 +- .../hadoop/hbase/regionserver/HStore.java | 17 +- .../hbase/regionserver/ImmutableSegment.java | 4 - .../hadoop/hbase/regionserver/MemStore.java | 14 +- .../hbase/regionserver/MemStoreCompactor.java | 197 +++++ .../regionserver/RegionServicesForStores.java | 49 +- .../hadoop/hbase/regionserver/Segment.java | 34 +- .../hbase/regionserver/SegmentFactory.java | 6 +- .../hbase/regionserver/SegmentScanner.java | 5 +- .../apache/hadoop/hbase/regionserver/Store.java | 1 + .../regionserver/VersionedSegmentsList.java | 54 ++ .../hbase/regionserver/wal/AbstractFSWAL.java | 19 +- .../hadoop/hbase/regionserver/wal/FSHLog.java | 18 +- .../regionserver/wal/SequenceIdAccounting.java | 35 +- .../hadoop/hbase/wal/DisabledWALProvider.java | 8 +- .../java/org/apache/hadoop/hbase/wal/WAL.java | 15 +- .../hadoop/hbase/HBaseTestingUtility.java | 51 +- .../org/apache/hadoop/hbase/TestIOFencing.java | 5 + .../apache/hadoop/hbase/io/TestHeapSize.java | 12 +- .../regionserver/TestCompactingMemStore.java | 729 +++++++++++++++++++ .../hbase/regionserver/TestDefaultMemStore.java | 159 ++-- .../hadoop/hbase/regionserver/TestHRegion.java | 63 +- .../TestHRegionWithInMemoryFlush.java | 61 ++ .../regionserver/TestPerColumnFamilyFlush.java | 29 +- .../TestWalAndCompactingMemStoreFlush.java | 565 ++++++++++++++ hbase-shell/src/main/ruby/hbase.rb | 1 + hbase-shell/src/main/ruby/hbase/admin.rb | 1 + 38 files changed, 2863 insertions(+), 307 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java index 3c16f4e..3153430 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hbase; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import java.io.IOException; import java.util.Collections; import java.util.HashMap; @@ -38,7 +40,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.PrettyPrinter; import org.apache.hadoop.hbase.util.PrettyPrinter.Unit; -import com.google.common.base.Preconditions; /** * An HColumnDescriptor contains information about a column family such as the @@ -62,6 +63,8 @@ public class HColumnDescriptor implements Comparable<HColumnDescriptor> { // Version 11 -- add column family level configuration. private static final byte COLUMN_DESCRIPTOR_VERSION = (byte) 11; + private static final String IN_MEMORY_COMPACTION = "IN_MEMORY_COMPACTION"; + // These constants are used as FileInfo keys public static final String COMPRESSION = "COMPRESSION"; public static final String COMPRESSION_COMPACT = "COMPRESSION_COMPACT"; @@ -151,7 +154,7 @@ public class HColumnDescriptor implements Comparable<HColumnDescriptor> { * Default number of versions of a record to keep. */ public static final int DEFAULT_VERSIONS = HBaseConfiguration.create().getInt( - "hbase.column.max.version", 1); + "hbase.column.max.version", 1); /** * Default is not to keep a minimum of versions. @@ -170,6 +173,11 @@ public class HColumnDescriptor implements Comparable<HColumnDescriptor> { public static final boolean DEFAULT_IN_MEMORY = false; /** + * Default setting for whether to set the memstore of this column family as compacting or not. + */ + public static final boolean DEFAULT_IN_MEMORY_COMPACTION = false; + + /** * Default setting for preventing deleted from being collected immediately. */ public static final KeepDeletedCells DEFAULT_KEEP_DELETED = KeepDeletedCells.FALSE; @@ -246,30 +254,31 @@ public class HColumnDescriptor implements Comparable<HColumnDescriptor> { = new HashSet<Bytes>(); static { - DEFAULT_VALUES.put(BLOOMFILTER, DEFAULT_BLOOMFILTER); - DEFAULT_VALUES.put(REPLICATION_SCOPE, String.valueOf(DEFAULT_REPLICATION_SCOPE)); - DEFAULT_VALUES.put(HConstants.VERSIONS, String.valueOf(DEFAULT_VERSIONS)); - DEFAULT_VALUES.put(MIN_VERSIONS, String.valueOf(DEFAULT_MIN_VERSIONS)); - DEFAULT_VALUES.put(COMPRESSION, DEFAULT_COMPRESSION); - DEFAULT_VALUES.put(TTL, String.valueOf(DEFAULT_TTL)); - DEFAULT_VALUES.put(BLOCKSIZE, String.valueOf(DEFAULT_BLOCKSIZE)); - DEFAULT_VALUES.put(HConstants.IN_MEMORY, String.valueOf(DEFAULT_IN_MEMORY)); - DEFAULT_VALUES.put(BLOCKCACHE, String.valueOf(DEFAULT_BLOCKCACHE)); - DEFAULT_VALUES.put(KEEP_DELETED_CELLS, String.valueOf(DEFAULT_KEEP_DELETED)); - DEFAULT_VALUES.put(DATA_BLOCK_ENCODING, String.valueOf(DEFAULT_DATA_BLOCK_ENCODING)); - DEFAULT_VALUES.put(CACHE_DATA_ON_WRITE, String.valueOf(DEFAULT_CACHE_DATA_ON_WRITE)); - DEFAULT_VALUES.put(CACHE_DATA_IN_L1, String.valueOf(DEFAULT_CACHE_DATA_IN_L1)); - DEFAULT_VALUES.put(CACHE_INDEX_ON_WRITE, String.valueOf(DEFAULT_CACHE_INDEX_ON_WRITE)); - DEFAULT_VALUES.put(CACHE_BLOOMS_ON_WRITE, String.valueOf(DEFAULT_CACHE_BLOOMS_ON_WRITE)); - DEFAULT_VALUES.put(EVICT_BLOCKS_ON_CLOSE, String.valueOf(DEFAULT_EVICT_BLOCKS_ON_CLOSE)); - DEFAULT_VALUES.put(PREFETCH_BLOCKS_ON_OPEN, String.valueOf(DEFAULT_PREFETCH_BLOCKS_ON_OPEN)); - for (String s : DEFAULT_VALUES.keySet()) { - RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(s))); - } - RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(ENCRYPTION))); - RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(ENCRYPTION_KEY))); - RESERVED_KEYWORDS.add(new Bytes(IS_MOB_BYTES)); - RESERVED_KEYWORDS.add(new Bytes(MOB_THRESHOLD_BYTES)); + DEFAULT_VALUES.put(BLOOMFILTER, DEFAULT_BLOOMFILTER); + DEFAULT_VALUES.put(REPLICATION_SCOPE, String.valueOf(DEFAULT_REPLICATION_SCOPE)); + DEFAULT_VALUES.put(HConstants.VERSIONS, String.valueOf(DEFAULT_VERSIONS)); + DEFAULT_VALUES.put(MIN_VERSIONS, String.valueOf(DEFAULT_MIN_VERSIONS)); + DEFAULT_VALUES.put(COMPRESSION, DEFAULT_COMPRESSION); + DEFAULT_VALUES.put(TTL, String.valueOf(DEFAULT_TTL)); + DEFAULT_VALUES.put(BLOCKSIZE, String.valueOf(DEFAULT_BLOCKSIZE)); + DEFAULT_VALUES.put(HConstants.IN_MEMORY, String.valueOf(DEFAULT_IN_MEMORY)); + DEFAULT_VALUES.put(IN_MEMORY_COMPACTION, String.valueOf(DEFAULT_IN_MEMORY_COMPACTION)); + DEFAULT_VALUES.put(BLOCKCACHE, String.valueOf(DEFAULT_BLOCKCACHE)); + DEFAULT_VALUES.put(KEEP_DELETED_CELLS, String.valueOf(DEFAULT_KEEP_DELETED)); + DEFAULT_VALUES.put(DATA_BLOCK_ENCODING, String.valueOf(DEFAULT_DATA_BLOCK_ENCODING)); + DEFAULT_VALUES.put(CACHE_DATA_ON_WRITE, String.valueOf(DEFAULT_CACHE_DATA_ON_WRITE)); + DEFAULT_VALUES.put(CACHE_DATA_IN_L1, String.valueOf(DEFAULT_CACHE_DATA_IN_L1)); + DEFAULT_VALUES.put(CACHE_INDEX_ON_WRITE, String.valueOf(DEFAULT_CACHE_INDEX_ON_WRITE)); + DEFAULT_VALUES.put(CACHE_BLOOMS_ON_WRITE, String.valueOf(DEFAULT_CACHE_BLOOMS_ON_WRITE)); + DEFAULT_VALUES.put(EVICT_BLOCKS_ON_CLOSE, String.valueOf(DEFAULT_EVICT_BLOCKS_ON_CLOSE)); + DEFAULT_VALUES.put(PREFETCH_BLOCKS_ON_OPEN, String.valueOf(DEFAULT_PREFETCH_BLOCKS_ON_OPEN)); + for (String s : DEFAULT_VALUES.keySet()) { + RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(s))); + } + RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(ENCRYPTION))); + RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(ENCRYPTION_KEY))); + RESERVED_KEYWORDS.add(new Bytes(IS_MOB_BYTES)); + RESERVED_KEYWORDS.add(new Bytes(MOB_THRESHOLD_BYTES)); } private static final int UNINITIALIZED = -1; @@ -319,6 +328,7 @@ public class HColumnDescriptor implements Comparable<HColumnDescriptor> { setMinVersions(DEFAULT_MIN_VERSIONS); setKeepDeletedCells(DEFAULT_KEEP_DELETED); setInMemory(DEFAULT_IN_MEMORY); + setInMemoryCompaction(DEFAULT_IN_MEMORY_COMPACTION); setBlockCacheEnabled(DEFAULT_BLOCKCACHE); setTimeToLive(DEFAULT_TTL); setCompressionType(Compression.Algorithm.valueOf(DEFAULT_COMPRESSION.toUpperCase())); @@ -676,6 +686,27 @@ public class HColumnDescriptor implements Comparable<HColumnDescriptor> { return setValue(HConstants.IN_MEMORY, Boolean.toString(inMemory)); } + /** + * @return True if we prefer to keep the in-memory data compacted + * for this column family + */ + public boolean isInMemoryCompaction() { + String value = getValue(IN_MEMORY_COMPACTION); + if (value != null) { + return Boolean.parseBoolean(value); + } + return DEFAULT_IN_MEMORY_COMPACTION; + } + + /** + * @param inMemoryCompaction True if we prefer to keep the in-memory data compacted + * for this column family + * @return this (for chained invocation) + */ + public HColumnDescriptor setInMemoryCompaction(boolean inMemoryCompaction) { + return setValue(IN_MEMORY_COMPACTION, Boolean.toString(inMemoryCompaction)); + } + public KeepDeletedCells getKeepDeletedCells() { String value = getValue(KEEP_DELETED_CELLS); if (value != null) { http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java ---------------------------------------------------------------------- diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java index c864580..430c8a6 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java @@ -80,7 +80,7 @@ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl; import org.apache.hadoop.hbase.mapreduce.WALPlayer; -import org.apache.hadoop.hbase.regionserver.FlushLargeStoresPolicy; +import org.apache.hadoop.hbase.regionserver.FlushAllLargeStoresPolicy; import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.testclassification.IntegrationTests; @@ -1586,7 +1586,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { Configuration conf = getTestingUtil(getConf()).getConfiguration(); if (isMultiUnevenColumnFamilies(getConf())) { // make sure per CF flush is on - conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName()); + conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName()); } int ret = ToolRunner.run(conf, new Loop(), new String[] { "1", "1", "2000000", http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java index c3724fc..0f27e0e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java @@ -64,7 +64,7 @@ public abstract class AbstractMemStore implements MemStore { (2 * Bytes.SIZEOF_LONG)); public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + - 2 * (ClassSize.ATOMIC_LONG + ClassSize.TIMERANGE_TRACKER + + (ClassSize.ATOMIC_LONG + ClassSize.TIMERANGE_TRACKER + ClassSize.CELL_SKIPLIST_SET + ClassSize.CONCURRENT_SKIPLISTMAP)); @@ -99,7 +99,7 @@ public abstract class AbstractMemStore implements MemStore { * @param onlyIfMoreRecent a flag that marks whether to update the sequence id no matter what or * only if it is greater than the previous sequence id */ - public abstract void updateLowestUnflushedSequenceIdInWal(boolean onlyIfMoreRecent); + public abstract void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent); /** * Write an update @@ -162,17 +162,9 @@ public abstract class AbstractMemStore implements MemStore { } /** - * An override on snapshot so the no arg version of the method implies zero seq num, - * like for cases without wal - */ - public MemStoreSnapshot snapshot() { - return snapshot(0); - } - - /** * The passed snapshot was successfully persisted; it can be let go. * @param id Id of the snapshot to clean out. - * @see MemStore#snapshot(long) + * @see MemStore#snapshot() */ @Override public void clearSnapshot(long id) throws UnexpectedStateException { @@ -201,18 +193,6 @@ public abstract class AbstractMemStore implements MemStore { } /** - * On flush, how much memory we will clear from the active cell set. - * - * @return size of data that is going to be flushed from active set - */ - @Override - public long getFlushableSize() { - long snapshotSize = getSnapshot().getSize(); - return snapshotSize > 0 ? snapshotSize : keySize(); - } - - - /** * @return a list containing a single memstore scanner. */ @Override @@ -230,7 +210,7 @@ public abstract class AbstractMemStore implements MemStore { StringBuffer buf = new StringBuffer(); int i = 1; try { - for (Segment segment : getListOfSegments()) { + for (Segment segment : getSegments()) { buf.append("Segment (" + i + ") " + segment.toString() + "; "); i++; } @@ -471,9 +451,6 @@ public abstract class AbstractMemStore implements MemStore { * Returns an ordered list of segments from most recent to oldest in memstore * @return an ordered list of segments from most recent to oldest in memstore */ - protected abstract List<Segment> getListOfSegments() throws IOException; + protected abstract List<Segment> getSegments() throws IOException; - public long getActiveSize() { - return getActive().getSize(); - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java new file mode 100644 index 0000000..7aaece6 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -0,0 +1,406 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.wal.WAL; + +/** + * A memstore implementation which supports in-memory compaction. + * A compaction pipeline is added between the active set and the snapshot data structures; + * it consists of a list of kv-sets that are subject to compaction. + * Like the snapshot, all pipeline components are read-only; updates only affect the active set. + * To ensure this property we take advantage of the existing blocking mechanism -- the active set + * is pushed to the pipeline while holding the region's updatesLock in exclusive mode. + * Periodically, a compaction is applied in the background to all pipeline components resulting + * in a single read-only component. The ``old'' components are discarded when no scanner is reading + * them. + */ +@InterfaceAudience.Private +public class CompactingMemStore extends AbstractMemStore { + public final static long DEEP_OVERHEAD_PER_PIPELINE_ITEM = ClassSize.align( + ClassSize.TIMERANGE_TRACKER + ClassSize.TIMERANGE + + ClassSize.CELL_SKIPLIST_SET + ClassSize.CONCURRENT_SKIPLISTMAP); + // Default fraction of in-memory-flush size w.r.t. flush-to-disk size + public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY = + "hbase.memestore.inmemoryflush.threshold.factor"; + private static final double IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT = 0.25; + + private static final Log LOG = LogFactory.getLog(CompactingMemStore.class); + private Store store; + private RegionServicesForStores regionServices; + private CompactionPipeline pipeline; + private MemStoreCompactor compactor; + // the threshold on active size for in-memory flush + private long inmemoryFlushSize; + private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false); + @VisibleForTesting + private final AtomicBoolean allowCompaction = new AtomicBoolean(true); + + public CompactingMemStore(Configuration conf, CellComparator c, + HStore store, RegionServicesForStores regionServices) throws IOException { + super(conf, c); + this.store = store; + this.regionServices = regionServices; + this.pipeline = new CompactionPipeline(getRegionServices()); + this.compactor = new MemStoreCompactor(this); + initInmemoryFlushSize(conf); + } + + private void initInmemoryFlushSize(Configuration conf) { + long memstoreFlushSize = getRegionServices().getMemstoreFlushSize(); + int numStores = getRegionServices().getNumStores(); + if (numStores <= 1) { + // Family number might also be zero in some of our unit test case + numStores = 1; + } + inmemoryFlushSize = memstoreFlushSize / numStores; + // multiply by a factor + double factor = conf.getDouble(IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, + IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT); + inmemoryFlushSize *= factor; + LOG.debug("Setting in-memory flush size threshold to " + inmemoryFlushSize); + } + + public static long getSegmentSize(Segment segment) { + return segment.getSize() - DEEP_OVERHEAD_PER_PIPELINE_ITEM; + } + + public static long getSegmentsSize(List<? extends Segment> list) { + long res = 0; + for (Segment segment : list) { + res += getSegmentSize(segment); + } + return res; + } + + /** + * @return Total memory occupied by this MemStore. + * This is not thread safe and the memstore may be changed while computing its size. + * It is the responsibility of the caller to make sure this doesn't happen. + */ + @Override + public long size() { + long res = 0; + for (Segment item : getSegments()) { + res += item.getSize(); + } + return res; + } + + /** + * This method is called when it is clear that the flush to disk is completed. + * The store may do any post-flush actions at this point. + * One example is to update the WAL with sequence number that is known only at the store level. + */ + @Override public void finalizeFlush() { + updateLowestUnflushedSequenceIdInWAL(false); + } + + @Override public boolean isSloppy() { + return true; + } + + /** + * Push the current active memstore segment into the pipeline + * and create a snapshot of the tail of current compaction pipeline + * Snapshot must be cleared by call to {@link #clearSnapshot}. + * {@link #clearSnapshot(long)}. + * @return {@link MemStoreSnapshot} + */ + @Override + public MemStoreSnapshot snapshot() { + MutableSegment active = getActive(); + // If snapshot currently has entries, then flusher failed or didn't call + // cleanup. Log a warning. + if (!getSnapshot().isEmpty()) { + LOG.warn("Snapshot called again without clearing previous. " + + "Doing nothing. Another ongoing flush or did we fail last attempt?"); + } else { + LOG.info("FLUSHING TO DISK: region "+ getRegionServices().getRegionInfo() + .getRegionNameAsString() + "store: "+ getFamilyName()); + stopCompaction(); + pushActiveToPipeline(active); + snapshotId = EnvironmentEdgeManager.currentTime(); + pushTailToSnapshot(); + } + return new MemStoreSnapshot(snapshotId, getSnapshot()); + } + + /** + * On flush, how much memory we will clear. + * @return size of data that is going to be flushed + */ + @Override public long getFlushableSize() { + long snapshotSize = getSnapshot().getSize(); + if(snapshotSize == 0) { + //if snapshot is empty the tail of the pipeline is flushed + snapshotSize = pipeline.getTailSize(); + } + return snapshotSize > 0 ? snapshotSize : keySize(); + } + + @Override + public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfGreater) { + long minSequenceId = pipeline.getMinSequenceId(); + if(minSequenceId != Long.MAX_VALUE) { + byte[] encodedRegionName = getRegionServices().getRegionInfo().getEncodedNameAsBytes(); + byte[] familyName = getFamilyNameInByte(); + WAL WAL = getRegionServices().getWAL(); + if (WAL != null) { + WAL.updateStore(encodedRegionName, familyName, minSequenceId, onlyIfGreater); + } + } + } + + @Override + public List<Segment> getSegments() { + List<Segment> pipelineList = pipeline.getSegments(); + List<Segment> list = new LinkedList<Segment>(); + list.add(getActive()); + list.addAll(pipelineList); + list.add(getSnapshot()); + return list; + } + + public void setInMemoryFlushInProgress(boolean inMemoryFlushInProgress) { + this.inMemoryFlushInProgress.set(inMemoryFlushInProgress); + } + + public void swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result) { + pipeline.swap(versionedList, result); + } + + public boolean hasCompactibleSegments() { + return !pipeline.isEmpty(); + } + + public VersionedSegmentsList getCompactibleSegments() { + return pipeline.getVersionedList(); + } + + public long getSmallestReadPoint() { + return store.getSmallestReadPoint(); + } + + public Store getStore() { + return store; + } + + public String getFamilyName() { + return Bytes.toString(getFamilyNameInByte()); + } + + @Override + /* + * Scanners are ordered from 0 (oldest) to newest in increasing order. + */ + protected List<SegmentScanner> getListOfScanners(long readPt) throws IOException { + List<Segment> pipelineList = pipeline.getSegments(); + long order = pipelineList.size(); + LinkedList<SegmentScanner> list = new LinkedList<SegmentScanner>(); + list.add(getActive().getSegmentScanner(readPt, order+1)); + for (Segment item : pipelineList) { + list.add(item.getSegmentScanner(readPt, order)); + order--; + } + list.add(getSnapshot().getSegmentScanner(readPt, order)); + return list; + } + + /** + * Check whether anything need to be done based on the current active set size. + * The method is invoked upon every addition to the active set. + * For CompactingMemStore, flush the active set to the read-only memory if it's + * size is above threshold + */ + @Override + protected void checkActiveSize() { + if (shouldFlushInMemory()) { + /* The thread is dispatched to flush-in-memory. This cannot be done + * on the same thread, because for flush-in-memory we require updatesLock + * in exclusive mode while this method (checkActiveSize) is invoked holding updatesLock + * in the shared mode. */ + InMemoryFlushRunnable runnable = new InMemoryFlushRunnable(); + LOG.info("Dispatching the MemStore in-memory flush for store " + store.getColumnFamilyName()); + getPool().execute(runnable); + // guard against queuing same old compactions over and over again + inMemoryFlushInProgress.set(true); + } + } + + // internally used method, externally visible only for tests + // when invoked directly from tests it must be verified that the caller doesn't hold updatesLock, + // otherwise there is a deadlock + @VisibleForTesting + void flushInMemory() throws IOException { + // Phase I: Update the pipeline + getRegionServices().blockUpdates(); + try { + MutableSegment active = getActive(); + LOG.info("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline, " + + "and initiating compaction."); + pushActiveToPipeline(active); + } finally { + getRegionServices().unblockUpdates(); + } + // Phase II: Compact the pipeline + try { + if (allowCompaction.get()) { + // setting the inMemoryFlushInProgress flag again for the case this method is invoked + // directly (only in tests) in the common path setting from true to true is idempotent + inMemoryFlushInProgress.set(true); + // Speculative compaction execution, may be interrupted if flush is forced while + // compaction is in progress + compactor.startCompaction(); + } + } catch (IOException e) { + LOG.warn("Unable to run memstore compaction. region " + + getRegionServices().getRegionInfo().getRegionNameAsString() + + "store: "+ getFamilyName(), e); + } + } + + private byte[] getFamilyNameInByte() { + return store.getFamily().getName(); + } + + private ThreadPoolExecutor getPool() { + return getRegionServices().getInMemoryCompactionPool(); + } + + private boolean shouldFlushInMemory() { + if(getActive().getSize() > inmemoryFlushSize) { + // size above flush threshold + return (allowCompaction.get() && !inMemoryFlushInProgress.get()); + } + return false; + } + + /** + * The request to cancel the compaction asynchronous task (caused by in-memory flush) + * The compaction may still happen if the request was sent too late + * Non-blocking request + */ + private void stopCompaction() { + if (inMemoryFlushInProgress.get()) { + compactor.stopCompact(); + inMemoryFlushInProgress.set(false); + } + } + + private void pushActiveToPipeline(MutableSegment active) { + if (!active.isEmpty()) { + long delta = DEEP_OVERHEAD_PER_PIPELINE_ITEM - DEEP_OVERHEAD; + active.setSize(active.getSize() + delta); + pipeline.pushHead(active); + resetCellSet(); + } + } + + private void pushTailToSnapshot() { + ImmutableSegment tail = pipeline.pullTail(); + if (!tail.isEmpty()) { + setSnapshot(tail); + long size = getSegmentSize(tail); + setSnapshotSize(size); + } + } + + private RegionServicesForStores getRegionServices() { + return regionServices; + } + + /** + * The in-memory-flusher thread performs the flush asynchronously. + * There is at most one thread per memstore instance. + * It takes the updatesLock exclusively, pushes active into the pipeline, releases updatesLock + * and compacts the pipeline. + */ + private class InMemoryFlushRunnable implements Runnable { + + @Override public void run() { + try { + flushInMemory(); + } catch (IOException e) { + LOG.warn("Unable to run memstore compaction. region " + + getRegionServices().getRegionInfo().getRegionNameAsString() + + "store: "+ getFamilyName(), e); + } + } + } + + //---------------------------------------------------------------------- + //methods for tests + //---------------------------------------------------------------------- + boolean isMemStoreFlushingInMemory() { + return inMemoryFlushInProgress.get(); + } + + void disableCompaction() { + allowCompaction.set(false); + } + + void enableCompaction() { + allowCompaction.set(true); + } + + /** + * @param cell Find the row that comes after this one. If null, we return the + * first. + * @return Next row or null if none found. + */ + Cell getNextRow(final Cell cell) { + Cell lowest = null; + List<Segment> segments = getSegments(); + for (Segment segment : segments) { + if (lowest == null) { + lowest = getNextRow(cell, segment.getCellSet()); + } else { + lowest = getLowest(lowest, getNextRow(cell, segment.getCellSet())); + } + } + return lowest; + } + + // debug method + private void debug() { + String msg = "active size="+getActive().getSize(); + msg += " threshold="+IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT* inmemoryFlushSize; + msg += " allow compaction is "+ (allowCompaction.get() ? "true" : "false"); + msg += " inMemoryFlushInProgress is "+ (inMemoryFlushInProgress.get() ? "true" : "false"); + LOG.debug(msg); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java new file mode 100644 index 0000000..e33ceae --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -0,0 +1,190 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * The compaction pipeline of a {@link CompactingMemStore}, is a FIFO queue of segments. + * It supports pushing a segment at the head of the pipeline and pulling a segment from the + * tail to flush to disk. + * It also supports swap operation to allow the compactor swap a subset of the segments with a new + * (compacted) one. This swap succeeds only if the version number passed with the list of segments + * to swap is the same as the current version of the pipeline. + * The pipeline version is updated whenever swapping segments or pulling the segment at the tail. + */ +@InterfaceAudience.Private +public class CompactionPipeline { + private static final Log LOG = LogFactory.getLog(CompactionPipeline.class); + + private final RegionServicesForStores region; + private LinkedList<ImmutableSegment> pipeline; + private long version; + + private static final ImmutableSegment EMPTY_MEM_STORE_SEGMENT = SegmentFactory.instance() + .createImmutableSegment(null, + CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_ITEM); + + public CompactionPipeline(RegionServicesForStores region) { + this.region = region; + this.pipeline = new LinkedList<ImmutableSegment>(); + this.version = 0; + } + + public boolean pushHead(MutableSegment segment) { + ImmutableSegment immutableSegment = SegmentFactory.instance(). + createImmutableSegment(segment); + synchronized (pipeline){ + return addFirst(immutableSegment); + } + } + + public ImmutableSegment pullTail() { + synchronized (pipeline){ + if(pipeline.isEmpty()) { + return EMPTY_MEM_STORE_SEGMENT; + } + return removeLast(); + } + } + + public VersionedSegmentsList getVersionedList() { + synchronized (pipeline){ + LinkedList<ImmutableSegment> segmentList = new LinkedList<ImmutableSegment>(pipeline); + VersionedSegmentsList res = new VersionedSegmentsList(segmentList, version); + return res; + } + } + + /** + * Swaps the versioned list at the tail of the pipeline with the new compacted segment. + * Swapping only if there were no changes to the suffix of the list while it was compacted. + * @param versionedList tail of the pipeline that was compacted + * @param segment new compacted segment + * @return true iff swapped tail with new compacted segment + */ + public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment) { + if(versionedList.getVersion() != version) { + return false; + } + LinkedList<ImmutableSegment> suffix; + synchronized (pipeline){ + if(versionedList.getVersion() != version) { + return false; + } + suffix = versionedList.getStoreSegments(); + LOG.info("Swapping pipeline suffix with compacted item. " + +"Just before the swap the number of segments in pipeline is:" + +versionedList.getStoreSegments().size() + +", and the number of cells in new segment is:"+segment.getCellsCount()); + swapSuffix(suffix,segment); + } + if(region != null) { + // update the global memstore size counter + long suffixSize = CompactingMemStore.getSegmentsSize(suffix); + long newSize = CompactingMemStore.getSegmentSize(segment); + long delta = suffixSize - newSize; + long globalMemstoreSize = region.addAndGetGlobalMemstoreSize(-delta); + LOG.info("Suffix size: "+ suffixSize+" compacted item size: "+newSize+ + " globalMemstoreSize: "+globalMemstoreSize); + } + return true; + } + + public boolean isEmpty() { + return pipeline.isEmpty(); + } + + public List<Segment> getSegments() { + synchronized (pipeline){ + List<Segment> res = new LinkedList<Segment>(pipeline); + return res; + } + } + + public long size() { + return pipeline.size(); + } + + public long getMinSequenceId() { + long minSequenceId = Long.MAX_VALUE; + if(!isEmpty()) { + minSequenceId = pipeline.getLast().getMinSequenceId(); + } + return minSequenceId; + } + + public long getTailSize() { + if(isEmpty()) return 0; + return CompactingMemStore.getSegmentSize(pipeline.peekLast()); + } + + private void swapSuffix(LinkedList<ImmutableSegment> suffix, ImmutableSegment segment) { + version++; + for(Segment itemInSuffix : suffix) { + itemInSuffix.close(); + } + pipeline.removeAll(suffix); + pipeline.addLast(segment); + } + + private ImmutableSegment removeLast() { + version++; + return pipeline.removeLast(); + } + + private boolean addFirst(ImmutableSegment segment) { + pipeline.add(0,segment); + return true; + } + + // debug method + private boolean validateSuffixList(LinkedList<ImmutableSegment> suffix) { + if(suffix.isEmpty()) { + // empty suffix is always valid + return true; + } + + Iterator<ImmutableSegment> pipelineBackwardIterator = pipeline.descendingIterator(); + Iterator<ImmutableSegment> suffixBackwardIterator = suffix.descendingIterator(); + ImmutableSegment suffixCurrent; + ImmutableSegment pipelineCurrent; + for( ; suffixBackwardIterator.hasNext(); ) { + if(!pipelineBackwardIterator.hasNext()) { + // a suffix longer than pipeline is invalid + return false; + } + suffixCurrent = suffixBackwardIterator.next(); + pipelineCurrent = pipelineBackwardIterator.next(); + if(suffixCurrent != pipelineCurrent) { + // non-matching suffix + return false; + } + } + // suffix matches pipeline suffix + return true; + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index 3d65bca..cdc910e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -77,10 +77,9 @@ public class DefaultMemStore extends AbstractMemStore { /** * Creates a snapshot of the current memstore. * Snapshot must be cleared by call to {@link #clearSnapshot(long)} - * @param flushOpSeqId the sequence id that is attached to the flush operation in the wal */ @Override - public MemStoreSnapshot snapshot(long flushOpSeqId) { + public MemStoreSnapshot snapshot() { // If snapshot currently has entries, then flusher failed or didn't call // cleanup. Log a warning. if (!getSnapshot().isEmpty()) { @@ -90,7 +89,7 @@ public class DefaultMemStore extends AbstractMemStore { this.snapshotId = EnvironmentEdgeManager.currentTime(); if (!getActive().isEmpty()) { ImmutableSegment immutableSegment = SegmentFactory.instance(). - createImmutableSegment(getConfiguration(), getActive()); + createImmutableSegment(getActive()); setSnapshot(immutableSegment); setSnapshotSize(keySize()); resetCellSet(); @@ -99,16 +98,30 @@ public class DefaultMemStore extends AbstractMemStore { return new MemStoreSnapshot(this.snapshotId, getSnapshot()); } + /** + * On flush, how much memory we will clear from the active cell set. + * + * @return size of data that is going to be flushed from active set + */ + @Override + public long getFlushableSize() { + long snapshotSize = getSnapshot().getSize(); + return snapshotSize > 0 ? snapshotSize : keySize(); + } + @Override + /* + * Scanners are ordered from 0 (oldest) to newest in increasing order. + */ protected List<SegmentScanner> getListOfScanners(long readPt) throws IOException { List<SegmentScanner> list = new ArrayList<SegmentScanner>(2); - list.add(0, getActive().getSegmentScanner(readPt)); - list.add(1, getSnapshot().getSegmentScanner(readPt)); + list.add(0, getActive().getSegmentScanner(readPt, 1)); + list.add(1, getSnapshot().getSegmentScanner(readPt, 0)); return list; } @Override - protected List<Segment> getListOfSegments() throws IOException { + protected List<Segment> getSegments() throws IOException { List<Segment> list = new ArrayList<Segment>(2); list.add(0, getActive()); list.add(1, getSnapshot()); @@ -126,7 +139,7 @@ public class DefaultMemStore extends AbstractMemStore { getNextRow(cell, getSnapshot().getCellSet())); } - @Override public void updateLowestUnflushedSequenceIdInWal(boolean onlyIfMoreRecent) { + @Override public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent) { } /** @@ -150,6 +163,10 @@ public class DefaultMemStore extends AbstractMemStore { public void finalizeFlush() { } + @Override public boolean isSloppy() { + return false; + } + /** * Code to help figure if our approximation of object heap sizes is close * enough. See hbase-900. Fills memstores then waits so user can heap http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java new file mode 100644 index 0000000..362d0f9 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * A {@link FlushPolicy} that only flushes store larger a given threshold. If no store is large + * enough, then all stores will be flushed. + */ +public class FlushAllLargeStoresPolicy extends FlushLargeStoresPolicy{ + + private static final Log LOG = LogFactory.getLog(FlushAllLargeStoresPolicy.class); + + @Override + protected void configureForRegion(HRegion region) { + super.configureForRegion(region); + int familyNumber = region.getTableDesc().getFamilies().size(); + if (familyNumber <= 1) { + // No need to parse and set flush size lower bound if only one family + // Family number might also be zero in some of our unit test case + return; + } + this.flushSizeLowerBound = getFlushSizeLowerBound(region); + } + + @Override + public Collection<Store> selectStoresToFlush() { + // no need to select stores if only one family + if (region.getTableDesc().getFamilies().size() == 1) { + return region.stores.values(); + } + // start selection + Collection<Store> stores = region.stores.values(); + Set<Store> specificStoresToFlush = new HashSet<Store>(); + for (Store store : stores) { + if (shouldFlush(store)) { + specificStoresToFlush.add(store); + } + } + if (!specificStoresToFlush.isEmpty()) return specificStoresToFlush; + + // Didn't find any CFs which were above the threshold for selection. + if (LOG.isDebugEnabled()) { + LOG.debug("Since none of the CFs were above the size, flushing all."); + } + return stores; + } + + @Override + protected boolean shouldFlush(Store store) { + return (super.shouldFlush(store) || region.shouldFlushStore(store)); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java index b4d47c7..49cb747 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java @@ -17,10 +17,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.util.Collection; -import java.util.HashSet; -import java.util.Set; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -31,7 +27,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; * enough, then all stores will be flushed. */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) -public class FlushLargeStoresPolicy extends FlushPolicy { +public abstract class FlushLargeStoresPolicy extends FlushPolicy { private static final Log LOG = LogFactory.getLog(FlushLargeStoresPolicy.class); @@ -41,20 +37,13 @@ public class FlushLargeStoresPolicy extends FlushPolicy { public static final String HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN = "hbase.hregion.percolumnfamilyflush.size.lower.bound.min"; - private static final long DEFAULT_HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN = + public static final long DEFAULT_HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN = 1024 * 1024 * 16L; - private long flushSizeLowerBound = -1; + protected long flushSizeLowerBound = -1; - @Override - protected void configureForRegion(HRegion region) { - super.configureForRegion(region); + protected long getFlushSizeLowerBound(HRegion region) { int familyNumber = region.getTableDesc().getFamilies().size(); - if (familyNumber <= 1) { - // No need to parse and set flush size lower bound if only one family - // Family number might also be zero in some of our unit test case - return; - } // For multiple families, lower bound is the "average flush size" by default // unless setting in configuration is larger. long flushSizeLowerBound = region.getMemstoreFlushSize() / familyNumber; @@ -85,44 +74,19 @@ public class FlushLargeStoresPolicy extends FlushPolicy { } } - this.flushSizeLowerBound = flushSizeLowerBound; + return flushSizeLowerBound; } - private boolean shouldFlush(Store store) { + protected boolean shouldFlush(Store store) { if (store.getMemStoreSize() > this.flushSizeLowerBound) { if (LOG.isDebugEnabled()) { LOG.debug("Flush Column Family " + store.getColumnFamilyName() + " of " + - region.getRegionInfo().getEncodedName() + " because memstoreSize=" + - store.getMemStoreSize() + " > lower bound=" + this.flushSizeLowerBound); + region.getRegionInfo().getEncodedName() + " because memstoreSize=" + + store.getMemStoreSize() + " > lower bound=" + this.flushSizeLowerBound); } return true; } - return region.shouldFlushStore(store); - } - - @Override - public Collection<Store> selectStoresToFlush() { - // no need to select stores if only one family - if (region.getTableDesc().getFamilies().size() == 1) { - return region.stores.values(); - } - // start selection - Collection<Store> stores = region.stores.values(); - Set<Store> specificStoresToFlush = new HashSet<Store>(); - for (Store store : stores) { - if (shouldFlush(store)) { - specificStoresToFlush.add(store); - } - } - // Didn't find any CFs which were above the threshold for selection. - if (specificStoresToFlush.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Since none of the CFs were above the size, flushing all."); - } - return stores; - } else { - return specificStoresToFlush; - } + return false; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java new file mode 100644 index 0000000..2921f23 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.Collection; +import java.util.HashSet; + +/** + * A {@link FlushPolicy} that only flushes store larger than a given threshold. If no store is large + * enough, then all stores will be flushed. + * Gives priority to selecting regular stores first, and only if no other + * option, selects sloppy stores which normaly require more memory. + */ +public class FlushNonSloppyStoresFirstPolicy extends FlushLargeStoresPolicy { + + private Collection<Store> regularStores = new HashSet<>(); + private Collection<Store> sloppyStores = new HashSet<>(); + + /** + * @return the stores need to be flushed. + */ + @Override public Collection<Store> selectStoresToFlush() { + Collection<Store> specificStoresToFlush = new HashSet<Store>(); + for(Store store : regularStores) { + if(shouldFlush(store) || region.shouldFlushStore(store)) { + specificStoresToFlush.add(store); + } + } + if(!specificStoresToFlush.isEmpty()) return specificStoresToFlush; + for(Store store : sloppyStores) { + if(shouldFlush(store)) { + specificStoresToFlush.add(store); + } + } + if(!specificStoresToFlush.isEmpty()) return specificStoresToFlush; + return region.stores.values(); + } + + @Override + protected void configureForRegion(HRegion region) { + super.configureForRegion(region); + this.flushSizeLowerBound = getFlushSizeLowerBound(region); + for(Store store : region.stores.values()) { + if(store.getMemStore().isSloppy()) { + sloppyStores.add(store); + } else { + regularStores.add(store); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicyFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicyFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicyFactory.java index e80b696..b93594e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicyFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicyFactory.java @@ -41,7 +41,7 @@ public class FlushPolicyFactory { public static final String HBASE_FLUSH_POLICY_KEY = "hbase.regionserver.flush.policy"; private static final Class<? extends FlushPolicy> DEFAULT_FLUSH_POLICY_CLASS = - FlushLargeStoresPolicy.class; + FlushAllLargeStoresPolicy.class; /** * Create the FlushPolicy configured for the given table. http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java index bfa1f80..8634e37 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java @@ -514,6 +514,10 @@ public class HMobStore extends HStore { @Override public void finalizeFlush() { } + @Override public MemStore getMemStore() { + return null; + } + public void updateCellsCountCompactedToMob(long count) { cellsCountCompactedToMob += count; } http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index e7a99a9..e5f9d50 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -19,6 +19,19 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.io.Closeables; +import com.google.protobuf.ByteString; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; +import com.google.protobuf.TextFormat; import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; @@ -182,19 +195,6 @@ import org.apache.hadoop.util.StringUtils; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.io.Closeables; -import com.google.protobuf.ByteString; -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; -import com.google.protobuf.Service; -import com.google.protobuf.TextFormat; @SuppressWarnings("deprecation") @InterfaceAudience.Private @@ -923,11 +923,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi }); } boolean allStoresOpened = false; + boolean hasSloppyStores = false; try { for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) { Future<HStore> future = completionService.take(); HStore store = future.get(); this.stores.put(store.getFamily().getName(), store); + MemStore memStore = store.getMemStore(); + if(memStore != null && memStore.isSloppy()) { + hasSloppyStores = true; + } long storeMaxSequenceId = store.getMaxSequenceId(); maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), @@ -941,6 +946,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } allStoresOpened = true; + if(hasSloppyStores) { + htableDescriptor.setFlushPolicyClassName(FlushNonSloppyStoresFirstPolicy.class + .getName()); + LOG.info("Setting FlushNonSloppyStoresFirstPolicy for the region=" + this); + } } catch (InterruptedException e) { throw (InterruptedIOException)new InterruptedIOException().initCause(e); } catch (ExecutionException e) { @@ -1457,22 +1467,30 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi LOG.debug("Updates disabled for region " + this); // Don't flush the cache if we are aborting if (!abort && canFlush) { + int failedfFlushCount = 0; int flushCount = 0; - while (this.memstoreSize.get() > 0) { + long tmp = 0; + long remainingSize = this.memstoreSize.get(); + while (remainingSize > 0) { try { - if (flushCount++ > 0) { - int actualFlushes = flushCount - 1; - if (actualFlushes > 5) { - // If we tried 5 times and are unable to clear memory, abort - // so we do not lose data - throw new DroppedSnapshotException("Failed clearing memory after " + - actualFlushes + " attempts on region: " + - Bytes.toStringBinary(getRegionInfo().getRegionName())); - } - LOG.info("Running extra flush, " + actualFlushes + - " (carrying snapshot?) " + this); - } internalFlushcache(status); + if(flushCount >0) { + LOG.info("Running extra flush, " + flushCount + + " (carrying snapshot?) " + this); + } + flushCount++; + tmp = this.memstoreSize.get(); + if (tmp >= remainingSize) { + failedfFlushCount++; + } + remainingSize = tmp; + if (failedfFlushCount > 5) { + // If we failed 5 times and are unable to clear memory, abort + // so we do not lose data + throw new DroppedSnapshotException("Failed clearing memory after " + + flushCount + " attempts on region: " + + Bytes.toStringBinary(getRegionInfo().getRegionName())); + } } catch (IOException ioe) { status.setStatus("Failed flush " + this + ", putting online again"); synchronized (writestate) { http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 8dfa0e0..2d1b9a6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -230,8 +230,15 @@ public class HStore implements Store { // to clone it? scanInfo = new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.comparator); String className = conf.get(MEMSTORE_CLASS_NAME, DefaultMemStore.class.getName()); - this.memstore = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] { - Configuration.class, CellComparator.class }, new Object[] { conf, this.comparator }); + if (family.isInMemoryCompaction()) { + className = CompactingMemStore.class.getName(); + this.memstore = new CompactingMemStore(conf, this.comparator, this, + this.getHRegion().getRegionServicesForStores()); + } else { + this.memstore = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] { + Configuration.class, CellComparator.class }, new Object[] { conf, this.comparator }); + } + LOG.info("Memstore class name is " + className); this.offPeakHours = OffPeakHours.getInstance(conf); // Setting up cache configuration for this family @@ -2149,7 +2156,7 @@ public class HStore implements Store { @Override public void prepare() { // passing the current sequence number of the wal - to allow bookkeeping in the memstore - this.snapshot = memstore.snapshot(cacheFlushSeqNum); + this.snapshot = memstore.snapshot(); this.cacheFlushCount = snapshot.getCellsCount(); this.cacheFlushSize = snapshot.getSize(); committedFiles = new ArrayList<Path>(1); @@ -2476,6 +2483,10 @@ public class HStore implements Store { memstore.finalizeFlush(); } + @Override public MemStore getMemStore() { + return memstore; + } + private void clearCompactedfiles(final List<StoreFile> filesToRemove) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("Clearing the compacted file " + filesToRemove + " from this store"); http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java index 70a608d..13d9fbf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java @@ -64,8 +64,4 @@ public class ImmutableSegment extends Segment { return this.timeRange.getMin(); } - @Override - protected void updateMetaInfo(Cell toAdd, long s) { - throw new IllegalAccessError("This is an immutable segment"); - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index ea72b7f..00d49d1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -42,19 +42,10 @@ public interface MemStore extends HeapSize { MemStoreSnapshot snapshot(); /** - * Creates a snapshot of the current memstore. Snapshot must be cleared by call to - * {@link #clearSnapshot(long)}. - * @param flushOpSeqId the current sequence number of the wal; to be attached to the flushed - * segment - * @return {@link MemStoreSnapshot} - */ - MemStoreSnapshot snapshot(long flushOpSeqId); - - /** * Clears the current snapshot of the Memstore. * @param id * @throws UnexpectedStateException - * @see #snapshot(long) + * @see #snapshot() */ void clearSnapshot(long id) throws UnexpectedStateException; @@ -144,4 +135,7 @@ public interface MemStore extends HeapSize { * One example is to update the wal with sequence number that is known only at the store level. */ void finalizeFlush(); + + /* Return true if the memstore may need some extra memory space*/ + boolean isSloppy(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java new file mode 100644 index 0000000..88e067e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -0,0 +1,197 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Scan; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * The ongoing MemStore Compaction manager, dispatches a solo running compaction + * and interrupts the compaction if requested. + * The MemStoreScanner is used to traverse the compaction pipeline. The MemStoreScanner + * is included in internal store scanner, where all compaction logic is implemented. + * Threads safety: It is assumed that the compaction pipeline is immutable, + * therefore no special synchronization is required. + */ +class MemStoreCompactor { + + private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class); + private CompactingMemStore compactingMemStore; + private MemStoreScanner scanner; // scanner for pipeline only + // scanner on top of MemStoreScanner that uses ScanQueryMatcher + private StoreScanner compactingScanner; + + // smallest read point for any ongoing MemStore scan + private long smallestReadPoint; + + // a static version of the segment list from the pipeline + private VersionedSegmentsList versionedList; + private final AtomicBoolean isInterrupted = new AtomicBoolean(false); + + public MemStoreCompactor(CompactingMemStore compactingMemStore) { + this.compactingMemStore = compactingMemStore; + } + + /** + * The request to dispatch the compaction asynchronous task. + * The method returns true if compaction was successfully dispatched, or false if there + * is already an ongoing compaction or nothing to compact. + */ + public boolean startCompaction() throws IOException { + if (!compactingMemStore.hasCompactibleSegments()) return false; // no compaction on empty + + List<SegmentScanner> scanners = new ArrayList<SegmentScanner>(); + // get the list of segments from the pipeline + versionedList = compactingMemStore.getCompactibleSegments(); + // the list is marked with specific version + + // create the list of scanners with maximally possible read point, meaning that + // all KVs are going to be returned by the pipeline traversing + for (Segment segment : versionedList.getStoreSegments()) { + scanners.add(segment.getSegmentScanner(Long.MAX_VALUE)); + } + scanner = + new MemStoreScanner(compactingMemStore, scanners, Long.MAX_VALUE, + MemStoreScanner.Type.COMPACT_FORWARD); + + smallestReadPoint = compactingMemStore.getSmallestReadPoint(); + compactingScanner = createScanner(compactingMemStore.getStore()); + + LOG.info("Starting the MemStore in-memory compaction for store " + + compactingMemStore.getStore().getColumnFamilyName()); + + doCompaction(); + return true; + } + + /** + * The request to cancel the compaction asynchronous task + * The compaction may still happen if the request was sent too late + * Non-blocking request + */ + public void stopCompact() { + isInterrupted.compareAndSet(false, true); + } + + + /** + * Close the scanners and clear the pointers in order to allow good + * garbage collection + */ + private void releaseResources() { + isInterrupted.set(false); + scanner.close(); + scanner = null; + compactingScanner.close(); + compactingScanner = null; + versionedList = null; + } + + /** + * The worker thread performs the compaction asynchronously. + * The solo (per compactor) thread only reads the compaction pipeline. + * There is at most one thread per memstore instance. + */ + private void doCompaction() { + + ImmutableSegment result = SegmentFactory.instance() // create the scanner + .createImmutableSegment( + compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), + CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_ITEM); + + // the compaction processing + try { + // Phase I: create the compacted MutableCellSetSegment + compactSegments(result); + + // Phase II: swap the old compaction pipeline + if (!isInterrupted.get()) { + compactingMemStore.swapCompactedSegments(versionedList, result); + // update the wal so it can be truncated and not get too long + compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // only if greater + } + } catch (Exception e) { + LOG.debug("Interrupting the MemStore in-memory compaction for store " + compactingMemStore + .getFamilyName()); + Thread.currentThread().interrupt(); + return; + } finally { + releaseResources(); + compactingMemStore.setInMemoryFlushInProgress(false); + } + + } + + /** + * Creates the scanner for compacting the pipeline. + * + * @return the scanner + */ + private StoreScanner createScanner(Store store) throws IOException { + + Scan scan = new Scan(); + scan.setMaxVersions(); //Get all available versions + + StoreScanner internalScanner = + new StoreScanner(store, store.getScanInfo(), scan, Collections.singletonList(scanner), + ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP); + + return internalScanner; + } + + /** + * Updates the given single Segment using the internal store scanner, + * who in turn uses ScanQueryMatcher + */ + private void compactSegments(Segment result) throws IOException { + + List<Cell> kvs = new ArrayList<Cell>(); + // get the limit to the size of the groups to be returned by compactingScanner + int compactionKVMax = compactingMemStore.getConfiguration().getInt( + HConstants.COMPACTION_KV_MAX, + HConstants.COMPACTION_KV_MAX_DEFAULT); + + ScannerContext scannerContext = + ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); + + boolean hasMore; + do { + hasMore = compactingScanner.next(kvs, scannerContext); + if (!kvs.isEmpty()) { + for (Cell c : kvs) { + // The scanner is doing all the elimination logic + // now we just copy it to the new segment + Cell newKV = result.maybeCloneWithAllocator(c); + result.internalAdd(newKV); + + } + kvs.clear(); + } + } while (hasMore && (!isInterrupted.get())); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java index d3c35b3..72f7bf5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java @@ -18,8 +18,15 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.StealJobQueue; +import org.apache.hadoop.hbase.wal.WAL; /** * Services a Store needs from a Region. @@ -32,6 +39,20 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; @InterfaceStability.Evolving public class RegionServicesForStores { + private static final int POOL_SIZE = 10; + private static final ThreadPoolExecutor INMEMORY_COMPACTION_POOL = + new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 60, TimeUnit.SECONDS, + new StealJobQueue<Runnable>().getStealFromQueue(), + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName(Thread.currentThread().getName() + + "-inmemoryCompactions-" + + System.currentTimeMillis()); + return t; + } + }); private final HRegion region; public RegionServicesForStores(HRegion region) { @@ -39,15 +60,37 @@ public class RegionServicesForStores { } public void blockUpdates() { - this.region.blockUpdates(); + region.blockUpdates(); } public void unblockUpdates() { - this.region.unblockUpdates(); + region.unblockUpdates(); } public long addAndGetGlobalMemstoreSize(long size) { - return this.region.addAndGetGlobalMemstoreSize(size); + return region.addAndGetGlobalMemstoreSize(size); + } + + public HRegionInfo getRegionInfo() { + return region.getRegionInfo(); } + public WAL getWAL() { + return region.getWAL(); + } + + public ThreadPoolExecutor getInMemoryCompactionPool() { return INMEMORY_COMPACTION_POOL; } + + public long getMemstoreFlushSize() { + return region.getMemstoreFlushSize(); + } + + public int getNumStores() { + return region.getStores().size(); + } + + // methods for tests + long getGlobalMemstoreTotalSize() { + return region.getMemstoreSize(); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java index dcad5a0..6435232 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java @@ -23,6 +23,7 @@ import java.util.SortedSet; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.KeyValue; @@ -40,8 +41,11 @@ import org.apache.hadoop.hbase.util.ByteRange; */ @InterfaceAudience.Private public abstract class Segment { + + private static final Log LOG = LogFactory.getLog(Segment.class); private volatile CellSet cellSet; private final CellComparator comparator; + private long minSequenceId; private volatile MemStoreLAB memStoreLAB; protected final AtomicLong size; protected volatile boolean tagsPresent; @@ -51,6 +55,7 @@ public abstract class Segment { long size) { this.cellSet = cellSet; this.comparator = comparator; + this.minSequenceId = Long.MAX_VALUE; this.memStoreLAB = memStoreLAB; this.size = new AtomicLong(size); this.tagsPresent = false; @@ -60,6 +65,7 @@ public abstract class Segment { protected Segment(Segment segment) { this.cellSet = segment.getCellSet(); this.comparator = segment.getComparator(); + this.minSequenceId = segment.getMinSequenceId(); this.memStoreLAB = segment.getMemStoreLAB(); this.size = new AtomicLong(segment.getSize()); this.tagsPresent = segment.isTagsPresent(); @@ -75,6 +81,14 @@ public abstract class Segment { } /** + * Creates the scanner for the given read point, and a specific order in a list + * @return a scanner for the given read point + */ + public SegmentScanner getSegmentScanner(long readPoint, long order) { + return new SegmentScanner(this, readPoint, order); + } + + /** * Returns whether the segment has any cells * @return whether the segment has any cells */ @@ -183,6 +197,10 @@ public abstract class Segment { size.addAndGet(delta); } + public long getMinSequenceId() { + return minSequenceId; + } + public TimeRangeTracker getTimeRangeTracker() { return this.timeRangeTracker; } @@ -231,10 +249,18 @@ public abstract class Segment { return s; } - /** - * Only mutable Segments implement this. - */ - protected abstract void updateMetaInfo(Cell toAdd, long s); + protected void updateMetaInfo(Cell toAdd, long s) { + getTimeRangeTracker().includeTimestamp(toAdd); + size.addAndGet(s); + minSequenceId = Math.min(minSequenceId, toAdd.getSequenceId()); + // In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call. + // When we use ACL CP or Visibility CP which deals with Tags during + // mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not + // parse the byte[] to identify the tags length. + if(toAdd.getTagsLength() > 0) { + tagsPresent = true; + } + } /** * Returns a subset of the segment cell set, which starts with the given cell http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java index 394ffa1..7ac80ae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java @@ -44,16 +44,16 @@ public final class SegmentFactory { final CellComparator comparator, long size) { MemStoreLAB memStoreLAB = getMemStoreLAB(conf); MutableSegment segment = generateMutableSegment(conf, comparator, memStoreLAB, size); - return createImmutableSegment(conf, segment); + return createImmutableSegment(segment); } public ImmutableSegment createImmutableSegment(CellComparator comparator, long size) { MutableSegment segment = generateMutableSegment(null, comparator, null, size); - return createImmutableSegment(null, segment); + return createImmutableSegment(segment); } - public ImmutableSegment createImmutableSegment(final Configuration conf, MutableSegment segment) { + public ImmutableSegment createImmutableSegment(MutableSegment segment) { return new ImmutableSegment(segment); } public MutableSegment createMutableSegment(final Configuration conf, http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java index 45f72d83..a04c1da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java @@ -62,6 +62,7 @@ public class SegmentScanner implements KeyValueScanner { /** * @param scannerOrder see {@link KeyValueScanner#getScannerOrder()}. + * Scanners are ordered from 0 (oldest) to newest in increasing order. */ protected SegmentScanner(Segment segment, long readPoint, long scannerOrder) { this.segment = segment; @@ -84,7 +85,6 @@ public class SegmentScanner implements KeyValueScanner { throw new RuntimeException("current is invalid: read point is "+readPoint+", " + "while current sequence id is " +current.getSequenceId()); } - return current; } @@ -172,9 +172,8 @@ public class SegmentScanner implements KeyValueScanner { */ @Override public boolean seekToPreviousRow(Cell cell) throws IOException { - boolean keepSeeking = false; + boolean keepSeeking; Cell key = cell; - do { Cell firstKeyOnRow = CellUtil.createFirstOnRow(key); SortedSet<Cell> cellHead = segment.headSet(firstKeyOnRow); http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index b77a33b..3419937 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -520,4 +520,5 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf */ void finalizeFlush(); + MemStore getMemStore(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java new file mode 100644 index 0000000..9d7a723 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java @@ -0,0 +1,54 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.LinkedList; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A list of segment managers coupled with the version of the memstore (version at the time it was + * created). + * This structure helps to guarantee that the compaction pipeline updates after the compaction is + * updated in a consistent (atomic) way. + * Specifically, swapping some of the elements in a compaction pipeline with a new compacted + * element is permitted only if the pipeline version is the same as the version attached to the + * elements. + * + */ +@InterfaceAudience.Private +public class VersionedSegmentsList { + + private final LinkedList<ImmutableSegment> storeSegments; + private final long version; + + public VersionedSegmentsList( + LinkedList<ImmutableSegment> storeSegments, long version) { + this.storeSegments = storeSegments; + this.version = version; + } + + public LinkedList<ImmutableSegment> getStoreSegments() { + return storeSegments; + } + + public long getVersion() { + return version; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index ae48f6c..3aafc23 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; +import com.google.common.annotations.VisibleForTesting; + import java.io.IOException; import java.io.InterruptedIOException; import java.lang.management.ManagementFactory; @@ -41,8 +43,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; -import com.google.common.annotations.VisibleForTesting; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -772,6 +772,21 @@ public abstract class AbstractFSWAL<W> implements WAL { LOG.info("Closed WAL: " + toString()); } + /** + * updates the sequence number of a specific store. + * depending on the flag: replaces current seq number if the given seq id is bigger, + * or even if it is lower than existing one + * @param encodedRegionName + * @param familyName + * @param sequenceid + * @param onlyIfGreater + */ + @Override public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid, + boolean onlyIfGreater) { + sequenceIdAccounting.updateStore(encodedRegionName,familyName,sequenceid,onlyIfGreater); + } + + protected SyncFuture getSyncFuture(final long sequence, Span span) { SyncFuture syncFuture = this.syncFuturesByHandler.get(Thread.currentThread()); if (syncFuture == null) {