HBASE-18294 Reduce global heap pressure: flush based on heap occupancy
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a458d7c4 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a458d7c4 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a458d7c4 Branch: refs/heads/branch-2 Commit: a458d7c40086fcde0ff9f6691a3b2f0a1a2a4dfc Parents: 0082f55 Author: eshcar <esh...@oath.com> Authored: Sun Feb 18 12:58:52 2018 +0200 Committer: eshcar <esh...@oath.com> Committed: Sun Feb 18 12:58:52 2018 +0200 ---------------------------------------------------------------------- .../hadoop/hbase/client/ConnectionUtils.java | 2 +- .../apache/hadoop/hbase/client/Mutation.java | 2 +- .../org/apache/hadoop/hbase/client/Result.java | 2 +- .../apache/hadoop/hbase/ByteBufferKeyValue.java | 2 +- .../apache/hadoop/hbase/PrivateCellUtil.java | 7 +- .../hbase/util/MapReduceExtendedCell.java | 2 +- .../hadoop/hbase/io/hfile/HFileBlockIndex.java | 2 +- .../hbase/regionserver/AbstractMemStore.java | 4 +- .../regionserver/CSLMImmutableSegment.java | 3 +- .../regionserver/CellArrayImmutableSegment.java | 9 +- .../regionserver/CellChunkImmutableSegment.java | 52 ++++++- .../hbase/regionserver/CompactingMemStore.java | 8 +- .../hbase/regionserver/CompactionPipeline.java | 47 +++--- .../regionserver/CompositeImmutableSegment.java | 12 +- .../hbase/regionserver/DefaultMemStore.java | 4 +- .../regionserver/FlushAllLargeStoresPolicy.java | 2 +- .../regionserver/FlushLargeStoresPolicy.java | 52 ++++--- .../FlushNonSloppyStoresFirstPolicy.java | 2 +- .../hadoop/hbase/regionserver/HRegion.java | 144 ++++++++++--------- .../hbase/regionserver/HRegionServer.java | 33 ++++- .../hadoop/hbase/regionserver/HStore.java | 2 +- .../regionserver/ImmutableMemStoreLAB.java | 12 ++ .../hbase/regionserver/ImmutableSegment.java | 4 + .../hbase/regionserver/MemStoreFlusher.java | 92 +++++++++--- .../hadoop/hbase/regionserver/MemStoreLAB.java | 8 +- .../hbase/regionserver/MemStoreLABImpl.java | 10 ++ .../hadoop/hbase/regionserver/MemStoreSize.java | 52 ++++++- .../hbase/regionserver/MemStoreSizing.java | 58 +++----- .../hbase/regionserver/MemStoreSnapshot.java | 16 +-- .../MetricsTableWrapperAggregateImpl.java | 2 +- .../hbase/regionserver/MutableSegment.java | 7 +- .../hadoop/hbase/regionserver/Region.java | 16 ++- .../regionserver/RegionServerAccounting.java | 45 +++--- .../regionserver/RegionServicesForStores.java | 4 +- .../hadoop/hbase/regionserver/Segment.java | 115 ++++++++++----- .../hadoop/hbase/regionserver/StoreScanner.java | 2 +- .../org/apache/hadoop/hbase/wal/WALEdit.java | 2 +- .../hadoop/hbase/TestGlobalMemStoreSize.java | 4 +- .../hbase/TestPartialResultsFromClientSide.java | 2 +- ...TestServerSideScanMetricsFromClientSide.java | 3 +- .../hbase/client/TestAsyncRegionAdminApi.java | 12 +- .../hadoop/hbase/client/TestClientPushback.java | 6 +- .../hbase/client/TestFlushFromClient.java | 14 +- .../hadoop/hbase/client/TestSizeFailures.java | 6 +- ...NegativeMemStoreSizeWithSlowCoprocessor.java | 3 +- .../regionserver/TestCompactingMemStore.java | 8 +- .../TestCompactingToCellFlatMapMemStore.java | 2 +- .../TestEndToEndSplitTransaction.java | 2 +- .../hadoop/hbase/regionserver/TestHRegion.java | 20 +-- .../regionserver/TestHRegionReplayEvents.java | 36 ++--- .../hadoop/hbase/regionserver/TestHStore.java | 6 +- .../regionserver/TestPerColumnFamilyFlush.java | 20 +-- .../TestRegionServerAccounting.java | 20 +-- .../hbase/regionserver/TestWALLockup.java | 4 +- .../TestWalAndCompactingMemStoreFlush.java | 18 +-- 55 files changed, 648 insertions(+), 376 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 1a093f8..c9e994f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -320,7 +320,7 @@ public final class ConnectionUtils { long estimatedHeapSizeOfResult = 0; // We don't make Iterator here for (Cell cell : rs.rawCells()) { - estimatedHeapSizeOfResult += PrivateCellUtil.estimatedHeapSizeOf(cell); + estimatedHeapSizeOfResult += PrivateCellUtil.estimatedSizeOfCell(cell); } return estimatedHeapSizeOfResult; } http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java index 4398fd6..09000ac 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java @@ -488,7 +488,7 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C size * ClassSize.REFERENCE); for(Cell cell : entry.getValue()) { - heapsize += PrivateCellUtil.estimatedHeapSizeOf(cell); + heapsize += PrivateCellUtil.estimatedSizeOfCell(cell); } } heapsize += getAttributeSize(); http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java index d30c25f..832689e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java @@ -859,7 +859,7 @@ public class Result implements CellScannable, CellScanner { return size; } for (Cell c : result.rawCells()) { - size += PrivateCellUtil.estimatedHeapSizeOf(c); + size += PrivateCellUtil.estimatedSizeOfCell(c); } return size; } http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-common/src/main/java/org/apache/hadoop/hbase/ByteBufferKeyValue.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ByteBufferKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ByteBufferKeyValue.java index c82ed8d..760d02c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ByteBufferKeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ByteBufferKeyValue.java @@ -255,7 +255,7 @@ public class ByteBufferKeyValue extends ByteBufferExtendedCell { if (this.buf.hasArray()) { return ClassSize.align(FIXED_OVERHEAD + length); } - return ClassSize.align(FIXED_OVERHEAD); + return ClassSize.align(FIXED_OVERHEAD) + KeyValueUtil.length(this); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-common/src/main/java/org/apache/hadoop/hbase/PrivateCellUtil.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/PrivateCellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/PrivateCellUtil.java index f2e749e..c8e04ce 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/PrivateCellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/PrivateCellUtil.java @@ -250,7 +250,7 @@ public final class PrivateCellUtil { @Override public long heapSize() { - long sum = HEAP_SIZE_OVERHEAD + estimatedHeapSizeOf(cell); + long sum = HEAP_SIZE_OVERHEAD + estimatedSizeOfCell(cell); if (this.tags != null) { sum += ClassSize.sizeOf(this.tags); } @@ -446,7 +446,7 @@ public final class PrivateCellUtil { @Override public long heapSize() { - long sum = HEAP_SIZE_OVERHEAD + estimatedHeapSizeOf(cell); + long sum = HEAP_SIZE_OVERHEAD + estimatedSizeOfCell(cell); // this.tags is on heap byte[] if (this.tags != null) { sum += ClassSize.sizeOf(this.tags); @@ -2783,10 +2783,11 @@ public final class PrivateCellUtil { * {@link HeapSize} we call {@link HeapSize#heapSize()} so cell can give a correct value. In other * cases we just consider the bytes occupied by the cell components ie. row, CF, qualifier, * timestamp, type, value and tags. + * Note that this can be the JVM heap space (on-heap) or the OS heap (off-heap) * @param cell * @return estimate of the heap space */ - public static long estimatedHeapSizeOf(final Cell cell) { + public static long estimatedSizeOfCell(final Cell cell) { if (cell instanceof HeapSize) { return ((HeapSize) cell).heapSize(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/util/MapReduceExtendedCell.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/util/MapReduceExtendedCell.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/util/MapReduceExtendedCell.java index 73eb7d8..75b57f4 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/util/MapReduceExtendedCell.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/util/MapReduceExtendedCell.java @@ -241,7 +241,7 @@ public class MapReduceExtendedCell extends ByteBufferExtendedCell { @Override public long heapSize() { - return PrivateCellUtil.estimatedHeapSizeOf(cell); + return PrivateCellUtil.estimatedSizeOfCell(cell); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java index 7b8815f..e8818be 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java @@ -266,7 +266,7 @@ public class HFileBlockIndex { // Adding blockKeys for (Cell key : blockKeys) { - heapSize += ClassSize.align(PrivateCellUtil.estimatedHeapSizeOf(key)); + heapSize += ClassSize.align(PrivateCellUtil.estimatedSizeOfCell(key)); } } // Add comparator and the midkey atomicreference http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java index 6dbe0a8..e6fd04d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java @@ -171,7 +171,9 @@ public abstract class AbstractMemStore implements MemStore { } MemStoreSizing getSnapshotSizing() { - return new MemStoreSizing(this.snapshot.keySize(), this.snapshot.heapSize()); + return new MemStoreSizing(this.snapshot.keySize(), + this.snapshot.heapSize(), + this.snapshot.offHeapSize()); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java index b5fe033..6af84cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CSLMImmutableSegment.java @@ -39,7 +39,8 @@ public class CSLMImmutableSegment extends ImmutableSegment { protected CSLMImmutableSegment(Segment segment) { super(segment); // update the segment metadata heap size - incSize(0, -MutableSegment.DEEP_OVERHEAD + DEEP_OVERHEAD_CSLM); + long indexOverhead = -MutableSegment.DEEP_OVERHEAD + DEEP_OVERHEAD_CSLM; + incSize(0, indexOverhead, 0); // CSLM is always on-heap } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java index 7e00899..4631200 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java @@ -45,7 +45,7 @@ public class CellArrayImmutableSegment extends ImmutableSegment { protected CellArrayImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator, MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactionStrategy.Action action) { super(null, comparator, memStoreLAB); // initiailize the CellSet with NULL - incSize(0, DEEP_OVERHEAD_CAM); + incSize(0, DEEP_OVERHEAD_CAM, 0); // CAM is always on-heap // build the new CellSet based on CellArrayMap and update the CellSet of the new Segment initializeCellSet(numOfCells, iterator, action); } @@ -58,7 +58,8 @@ public class CellArrayImmutableSegment extends ImmutableSegment { protected CellArrayImmutableSegment(CSLMImmutableSegment segment, MemStoreSizing memstoreSizing, MemStoreCompactionStrategy.Action action) { super(segment); // initiailize the upper class - incSize(0, DEEP_OVERHEAD_CAM - CSLMImmutableSegment.DEEP_OVERHEAD_CSLM); + long indexOverhead = DEEP_OVERHEAD_CAM - CSLMImmutableSegment.DEEP_OVERHEAD_CSLM; + incSize(0, indexOverhead, 0); // CAM is always on-heap int numOfCells = segment.getCellsCount(); // build the new CellSet based on CellChunkMap and update the CellSet of this Segment reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet(), @@ -66,8 +67,8 @@ public class CellArrayImmutableSegment extends ImmutableSegment { // arrange the meta-data size, decrease all meta-data sizes related to SkipList; // add sizes of CellArrayMap entry (reinitializeCellSet doesn't take the care for the sizes) long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY); - incSize(0, newSegmentSizeDelta); - memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta); + incSize(0, newSegmentSizeDelta, 0); + memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta, 0); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java index bf9b191..53458f1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java @@ -53,7 +53,15 @@ public class CellChunkImmutableSegment extends ImmutableSegment { protected CellChunkImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator, MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactionStrategy.Action action) { super(null, comparator, memStoreLAB); // initialize the CellSet with NULL - incSize(0, DEEP_OVERHEAD_CCM); // initiate the heapSize with the size of the segment metadata + long indexOverhead = DEEP_OVERHEAD_CCM; + // memStoreLAB cannot be null in this class + boolean onHeap = getMemStoreLAB().isOnHeap(); + // initiate the heapSize with the size of the segment metadata + if(onHeap) { + incSize(0, indexOverhead, 0); + } else { + incSize(0, 0, indexOverhead); + } // build the new CellSet based on CellArrayMap and update the CellSet of the new Segment initializeCellSet(numOfCells, iterator, action); } @@ -66,7 +74,15 @@ public class CellChunkImmutableSegment extends ImmutableSegment { protected CellChunkImmutableSegment(CSLMImmutableSegment segment, MemStoreSizing memstoreSizing, MemStoreCompactionStrategy.Action action) { super(segment); // initiailize the upper class - incSize(0,-CSLMImmutableSegment.DEEP_OVERHEAD_CSLM + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM); + long indexOverhead = -CSLMImmutableSegment.DEEP_OVERHEAD_CSLM + DEEP_OVERHEAD_CCM; + // memStoreLAB cannot be null in this class + boolean onHeap = getMemStoreLAB().isOnHeap(); + // initiate the heapSize with the size of the segment metadata + if(onHeap) { + incSize(0, indexOverhead, 0); + } else { + incSize(0, -CSLMImmutableSegment.DEEP_OVERHEAD_CSLM, DEEP_OVERHEAD_CCM); + } int numOfCells = segment.getCellsCount(); // build the new CellSet based on CellChunkMap reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet(), @@ -75,9 +91,32 @@ public class CellChunkImmutableSegment extends ImmutableSegment { // add sizes of CellChunkMap entry, decrease also Cell object sizes // (reinitializeCellSet doesn't take the care for the sizes) long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY); + if(onHeap) { + incSize(0, newSegmentSizeDelta, 0); + memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta, 0); + } else { + incSize(0, 0, newSegmentSizeDelta); + memstoreSizing.incMemStoreSize(0, 0, newSegmentSizeDelta); + + } + } - incSize(0, newSegmentSizeDelta); - memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta); + @Override + protected long indexEntryOnHeapSize(boolean onHeap) { + if(onHeap) { + return indexEntrySize(); + } + // else the index is allocated off-heap + return 0; + } + + @Override + protected long indexEntryOffHeapSize(boolean offHeap) { + if(offHeap) { + return indexEntrySize(); + } + // else the index is allocated on-heap + return 0; } @Override @@ -257,13 +296,16 @@ public class CellChunkImmutableSegment extends ImmutableSegment { // The actual size of the cell is not added yet, and will be added (only in compaction) // in initializeCellSet#updateMetaInfo(). long oldHeapSize = heapSizeChange(cell, true); + long oldOffHeapSize = offHeapSizeChange(cell, true); long oldCellSize = getCellLength(cell); cell = maybeCloneWithAllocator(cell, true); long newHeapSize = heapSizeChange(cell, true); + long newOffHeapSize = offHeapSizeChange(cell, true); long newCellSize = getCellLength(cell); long heapOverhead = newHeapSize - oldHeapSize; + long offHeapOverhead = newOffHeapSize - oldOffHeapSize; //TODO: maybe need to update the dataSize of the region - incSize(newCellSize - oldCellSize, heapOverhead); + incSize(newCellSize - oldCellSize, heapOverhead, offHeapOverhead); return cell; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 3cb4103..bcecdc7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -157,9 +157,9 @@ public class CompactingMemStore extends AbstractMemStore { @Override public MemStoreSize size() { MemStoreSizing memstoreSizing = new MemStoreSizing(); - memstoreSizing.incMemStoreSize(this.active.keySize(), this.active.heapSize()); + memstoreSizing.incMemStoreSize(active.getMemStoreSize()); for (Segment item : pipeline.getSegments()) { - memstoreSizing.incMemStoreSize(item.keySize(), item.heapSize()); + memstoreSizing.incMemStoreSize(item.getMemStoreSize()); } return memstoreSizing; } @@ -231,13 +231,13 @@ public class CompactingMemStore extends AbstractMemStore { // if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed if (compositeSnapshot) { snapshotSizing = pipeline.getPipelineSizing(); - snapshotSizing.incMemStoreSize(this.active.keySize(), this.active.heapSize()); + snapshotSizing.incMemStoreSize(active.getMemStoreSize()); } else { snapshotSizing = pipeline.getTailSizing(); } } return snapshotSizing.getDataSize() > 0 ? snapshotSizing - : new MemStoreSize(this.active.keySize(), this.active.heapSize()); + : new MemStoreSize(active.getMemStoreSize()); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 6cd5e31..2ffeb6c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -138,16 +138,25 @@ public class CompactionPipeline { if(segment != null) newDataSize = segment.keySize(); long dataSizeDelta = suffixDataSize - newDataSize; long suffixHeapSize = getSegmentsHeapSize(suffix); + long suffixOffHeapSize = getSegmentsOffHeapSize(suffix); long newHeapSize = 0; - if(segment != null) newHeapSize = segment.heapSize(); + long newOffHeapSize = 0; + if(segment != null) { + newHeapSize = segment.heapSize(); + newOffHeapSize = segment.offHeapSize(); + } + long offHeapSizeDelta = suffixOffHeapSize - newOffHeapSize; long heapSizeDelta = suffixHeapSize - newHeapSize; - region.addMemStoreSize(new MemStoreSizing(-dataSizeDelta, -heapSizeDelta)); - LOG.debug("Suffix data size={}, new segment data size={}, suffix heap size={}," + - "new segment heap size={}", - suffixDataSize, - newDataSize, - suffixHeapSize, - newHeapSize); + region.addMemStoreSize(new MemStoreSize(-dataSizeDelta, -heapSizeDelta, -offHeapSizeDelta)); + LOG.debug("Suffix data size={}, new segment data size={}, " + + "suffix heap size={}," + "new segment heap size={}" + + "suffix off heap size={}," + "new segment off heap size={}" + , suffixDataSize + , newDataSize + , suffixHeapSize + , newHeapSize + , suffixOffHeapSize + , newOffHeapSize); } return true; } @@ -160,6 +169,14 @@ public class CompactionPipeline { return res; } + private static long getSegmentsOffHeapSize(List<? extends Segment> list) { + long res = 0; + for (Segment segment : list) { + res += segment.offHeapSize(); + } + return res; + } + private static long getSegmentsKeySize(List<? extends Segment> list) { long res = 0; for (Segment segment : list) { @@ -201,7 +218,8 @@ public class CompactionPipeline { if(region != null) { // update the global memstore size counter // upon flattening there is no change in the data size - region.addMemStoreSize(new MemStoreSize(0, newMemstoreAccounting.getHeapSize())); + region.addMemStoreSize(new MemStoreSize(0, newMemstoreAccounting.getHeapSize(), + newMemstoreAccounting.getOffHeapSize())); } LOG.debug("Compaction pipeline segment {} flattened", s); return true; @@ -239,19 +257,16 @@ public class CompactionPipeline { public MemStoreSizing getTailSizing() { LinkedList<? extends Segment> localCopy = readOnlyCopy; if (localCopy.isEmpty()) return new MemStoreSizing(); - return new MemStoreSizing(localCopy.peekLast().keySize(), localCopy.peekLast().heapSize()); + return new MemStoreSizing(localCopy.peekLast().getMemStoreSize()); } public MemStoreSizing getPipelineSizing() { - long keySize = 0; - long heapSize = 0; + MemStoreSizing memStoreSizing = new MemStoreSizing(); LinkedList<? extends Segment> localCopy = readOnlyCopy; - if (localCopy.isEmpty()) return new MemStoreSizing(); for (Segment segment : localCopy) { - keySize += segment.keySize(); - heapSize += segment.heapSize(); + memStoreSizing.incMemStoreSize(segment.getMemStoreSize()); } - return new MemStoreSizing(keySize, heapSize); + return memStoreSizing; } private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment, http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java index 8bd990a..b6bbb59 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java @@ -43,7 +43,7 @@ public class CompositeImmutableSegment extends ImmutableSegment { private long keySize = 0; public CompositeImmutableSegment(CellComparator comparator, List<ImmutableSegment> segments) { - super(comparator); + super(comparator, segments); this.segments = segments; for (ImmutableSegment s : segments) { this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMax()); @@ -87,14 +87,6 @@ public class CompositeImmutableSegment extends ImmutableSegment { } /** - * @return the first cell in the segment that has equal or greater key than the given cell - */ - @Override - public Cell getFirstAfter(Cell cell) { - throw new IllegalStateException("Not supported by CompositeImmutableScanner"); - } - - /** * Closing a segment before it is being discarded */ @Override @@ -206,7 +198,7 @@ public class CompositeImmutableSegment extends ImmutableSegment { * Updates the heap size counter of the segment by the given delta */ @Override - protected void incSize(long delta, long heapOverhead) { + protected void incSize(long delta, long heapOverhead, long offHeapOverhead) { throw new IllegalStateException("Not supported by CompositeImmutableScanner"); } http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index 061e4d0..9ef6a6c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -106,7 +106,7 @@ public class DefaultMemStore extends AbstractMemStore { public MemStoreSize getFlushableSize() { MemStoreSize snapshotSize = getSnapshotSize(); return snapshotSize.getDataSize() > 0 ? snapshotSize - : new MemStoreSize(keySize(), heapSize()); + : new MemStoreSize(active.getMemStoreSize()); } @Override @@ -155,7 +155,7 @@ public class DefaultMemStore extends AbstractMemStore { @Override public MemStoreSize size() { - return new MemStoreSize(this.active.keySize(), this.active.heapSize()); + return new MemStoreSize(active.getMemStoreSize()); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java index 0f01178..1ca20a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java @@ -44,7 +44,7 @@ public class FlushAllLargeStoresPolicy extends FlushLargeStoresPolicy { // Family number might also be zero in some of our unit test case return; } - this.flushSizeLowerBound = getFlushSizeLowerBound(region); + setFlushSizeLowerBounds(region); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java index 2d2de24..4da1857 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java @@ -43,10 +43,11 @@ public abstract class FlushLargeStoresPolicy extends FlushPolicy { protected long flushSizeLowerBound = -1; - protected long getFlushSizeLowerBound(HRegion region) { int familyNumber = region.getTableDescriptor().getColumnFamilyCount(); + protected void setFlushSizeLowerBounds(HRegion region) { + int familyNumber = region.getTableDescriptor().getColumnFamilyCount(); // For multiple families, lower bound is the "average flush size" by default // unless setting in configuration is larger. - long flushSizeLowerBound = region.getMemStoreFlushSize() / familyNumber; + flushSizeLowerBound = region.getMemStoreFlushSize() / familyNumber; long minimumLowerBound = getConf().getLong(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, DEFAULT_HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN); @@ -57,36 +58,45 @@ public abstract class FlushLargeStoresPolicy extends FlushPolicy { String flushedSizeLowerBoundString = region.getTableDescriptor().getValue(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND); if (flushedSizeLowerBoundString == null) { - LOG.debug("No {} set in table {} descriptor;" + - "using region.getMemStoreFlushSize/# of families ({}) instead.", - HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, - region.getTableDescriptor().getTableName(), - StringUtils.humanSize(flushSizeLowerBound) + ")"); + LOG.debug("No {} set in table {} descriptor;" + + "using region.getMemStoreFlushHeapSize/# of families ({}) " + + "instead." + , HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND + , region.getTableDescriptor().getTableName() + , StringUtils.humanSize(flushSizeLowerBound) + + ")"); } else { try { flushSizeLowerBound = Long.parseLong(flushedSizeLowerBoundString); } catch (NumberFormatException nfe) { // fall back for fault setting - LOG.warn("Number format exception parsing {} for table {}: {}, {}; " + - "using region.getMemStoreFlushSize/# of families ({}) instead.", - HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, - region.getTableDescriptor().getTableName(), - flushedSizeLowerBoundString, - nfe, - flushSizeLowerBound); + LOG.warn("Number format exception parsing {} for table {}: {}, {}; " + + "using region.getMemStoreFlushHeapSize/# of families ({}) " + + "and region.getMemStoreFlushOffHeapSize/# of families ({}) " + + "instead." + , HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND + , region.getTableDescriptor().getTableName() + , flushedSizeLowerBoundString + , nfe + , flushSizeLowerBound + ); } } - return flushSizeLowerBound; } protected boolean shouldFlush(HStore store) { - if (store.getMemStoreSize().getDataSize() > this.flushSizeLowerBound) { - LOG.debug("Flush {} of {}; memstoreSize={} > lowerBound={}", - store.getColumnFamilyName(), - region.getRegionInfo().getEncodedName(), - store.getMemStoreSize().getDataSize(), - this.flushSizeLowerBound); + if (store.getMemStoreSize().getHeapSize() + + store.getMemStoreSize().getOffHeapSize() > this.flushSizeLowerBound) { + LOG.debug("Flush {} of {}; " + + "heap memstoreSize={} +" + + "off heap memstoreSize={} > memstore lowerBound={}" + , store.getColumnFamilyName() + , region.getRegionInfo().getEncodedName() + , store.getMemStoreSize().getHeapSize() + , store.getMemStoreSize().getOffHeapSize() + , this.flushSizeLowerBound + ); return true; } return false; http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java index ed23e3d..e95de9d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java @@ -63,7 +63,7 @@ public class FlushNonSloppyStoresFirstPolicy extends FlushLargeStoresPolicy { @Override protected void configureForRegion(HRegion region) { super.configureForRegion(region); - this.flushSizeLowerBound = getFlushSizeLowerBound(region); + setFlushSizeLowerBounds(region); for (HStore store : region.stores.values()) { if (store.isSloppyMemStore()) { sloppyStores.add(store); http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 9464fdb..e26cc43 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -65,7 +65,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -287,7 +286,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // TODO: account for each registered handler in HeapSize computation private Map<String, com.google.protobuf.Service> coprocessorServiceHandlers = Maps.newHashMap(); - private final AtomicLong memstoreDataSize = new AtomicLong(0);// Track data size in all memstores + // Track data size in all memstores + private final MemStoreSizing memStoreSize = new MemStoreSizing(); private final RegionServicesForStores regionServicesForStores = new RegionServicesForStores(this); // Debug possible data loss due to WAL off @@ -829,12 +829,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (flushSize <= 0) { flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, - TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE); + TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE); } this.memstoreFlushSize = flushSize; - this.blockingMemStoreSize = this.memstoreFlushSize * - conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, - HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER); + long mult = conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, + HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER); + this.blockingMemStoreSize = this.memstoreFlushSize * mult; } /** @@ -1192,32 +1192,38 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi /** * Increase the size of mem store in this region and the size of global mem * store - * @return the size of memstore in this region */ - public long addAndGetMemStoreSize(MemStoreSize memstoreSize) { + public void incMemStoreSize(MemStoreSize memStoreSize) { if (this.rsAccounting != null) { - rsAccounting.incGlobalMemStoreSize(memstoreSize); + rsAccounting.incGlobalMemStoreSize(memStoreSize); } - long size = this.memstoreDataSize.addAndGet(memstoreSize.getDataSize()); - checkNegativeMemStoreDataSize(size, memstoreSize.getDataSize()); - return size; + long dataSize; + synchronized (this.memStoreSize) { + this.memStoreSize.incMemStoreSize(memStoreSize); + dataSize = this.memStoreSize.getDataSize(); + } + checkNegativeMemStoreDataSize(dataSize, memStoreSize.getDataSize()); } - public void decrMemStoreSize(MemStoreSize memstoreSize) { + public void decrMemStoreSize(MemStoreSize memStoreSize) { if (this.rsAccounting != null) { - rsAccounting.decGlobalMemStoreSize(memstoreSize); + rsAccounting.decGlobalMemStoreSize(memStoreSize); + } + long size; + synchronized (this.memStoreSize) { + this.memStoreSize.decMemStoreSize(memStoreSize); + size = this.memStoreSize.getDataSize(); } - long size = this.memstoreDataSize.addAndGet(-memstoreSize.getDataSize()); - checkNegativeMemStoreDataSize(size, -memstoreSize.getDataSize()); + checkNegativeMemStoreDataSize(size, -memStoreSize.getDataSize()); } - private void checkNegativeMemStoreDataSize(long memstoreDataSize, long delta) { - // This is extremely bad if we make memstoreSize negative. Log as much info on the offending + private void checkNegativeMemStoreDataSize(long memStoreDataSize, long delta) { + // This is extremely bad if we make memStoreSize negative. Log as much info on the offending // caller as possible. (memStoreSize might be a negative value already -- freeing memory) - if (memstoreDataSize < 0) { + if (memStoreDataSize < 0) { LOG.error("Asked to modify this region's (" + this.toString() - + ") memstoreSize to a negative value which is incorrect. Current memstoreSize=" - + (memstoreDataSize - delta) + ", delta=" + delta, new Exception()); + + ") memStoreSize to a negative value which is incorrect. Current memStoreSize=" + + (memStoreDataSize - delta) + ", delta=" + delta, new Exception()); } } @@ -1250,8 +1256,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } @Override - public long getMemStoreSize() { - return memstoreDataSize.get(); + public long getMemStoreDataSize() { + return memStoreSize.getDataSize(); + } + + @Override + public long getMemStoreHeapSize() { + return memStoreSize.getHeapSize(); + } + + @Override + public long getMemStoreOffHeapSize() { + return memStoreSize.getOffHeapSize(); } /** @return store services for this region, to access services required by store level needs */ @@ -1521,7 +1537,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi int failedfFlushCount = 0; int flushCount = 0; long tmp = 0; - long remainingSize = this.memstoreDataSize.get(); + long remainingSize = this.memStoreSize.getDataSize(); while (remainingSize > 0) { try { internalFlushcache(status); @@ -1530,7 +1546,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi " (carrying snapshot?) " + this); } flushCount++; - tmp = this.memstoreDataSize.get(); + tmp = this.memStoreSize.getDataSize(); if (tmp >= remainingSize) { failedfFlushCount++; } @@ -1570,7 +1586,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi getRegionServerServices().abort("Assertion failed while closing store " + getRegionInfo().getRegionNameAsString() + " " + store + ". flushableSize expected=0, actual= " + flushableSize - + ". Current memstoreSize=" + getMemStoreSize() + ". Maybe a coprocessor " + + ". Current memStoreSize=" + getMemStoreDataSize() + ". Maybe a coprocessor " + "operation failed and left the memstore in a partially updated state.", null); } } @@ -1613,9 +1629,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.closed.set(true); if (!canFlush) { - this.decrMemStoreSize(new MemStoreSizing(memstoreDataSize.get(), getMemStoreHeapSize())); - } else if (memstoreDataSize.get() != 0) { - LOG.error("Memstore size is " + memstoreDataSize.get()); + this.decrMemStoreSize(new MemStoreSize(memStoreSize)); + } else if (memStoreSize.getDataSize() != 0) { + LOG.error("Memstore data size is " + memStoreSize.getDataSize()); } if (coprocessorHost != null) { status.setStatus("Running coprocessor post-close hooks"); @@ -1635,10 +1651,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - private long getMemStoreHeapSize() { - return stores.values().stream().mapToLong(s -> s.getMemStoreSize().getHeapSize()).sum(); - } - /** Wait for all current flushes and compactions of the region to complete */ // TODO HBASE-18906. Check the usage (if any) in Phoenix and expose this or give alternate way for // Phoenix needs. @@ -1752,7 +1764,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @return True if its worth doing a flush before we put up the close flag. */ private boolean worthPreFlushing() { - return this.memstoreDataSize.get() > + return this.memStoreSize.getDataSize() > this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5); } @@ -2370,12 +2382,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // bulk loaded file between memory and existing hfiles. It wants a good seqeunceId that belongs // to no other that it can use to associate with the bulk load. Hence this little dance below // to go get one. - if (this.memstoreDataSize.get() <= 0) { + if (this.memStoreSize.getDataSize() <= 0) { // Take an update lock so no edits can come into memory just yet. this.updatesLock.writeLock().lock(); WriteEntry writeEntry = null; try { - if (this.memstoreDataSize.get() <= 0) { + if (this.memStoreSize.getDataSize() <= 0) { // Presume that if there are still no edits in the memstore, then there are no edits for // this region out in the WAL subsystem so no need to do any trickery clearing out // edits in the WAL sub-system. Up the sequence number so the resulting flush id is for @@ -2511,8 +2523,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi .append(StringUtils.byteDesc(store.getFlushableSize().getDataSize())); } } - LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() + - " column families, memstore=" + StringUtils.byteDesc(this.memstoreDataSize.get()) + + LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() + " column families," + + " memstore data size=" + StringUtils.byteDesc(this.memStoreSize.getDataSize()) + + " memstore heap size=" + StringUtils.byteDesc(this.memStoreSize.getHeapSize()) + ((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") + ((wal != null) ? "" : "; WAL is null, using passed sequenceid=" + sequenceId)); } @@ -2699,11 +2712,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } long time = EnvironmentEdgeManager.currentTime() - startTime; - long memstoresize = this.memstoreDataSize.get(); - String msg = "Finished memstore flush of ~" - + StringUtils.byteDesc(prepareResult.totalFlushableSize.getDataSize()) + "/" - + prepareResult.totalFlushableSize.getDataSize() + ", currentsize=" - + StringUtils.byteDesc(memstoresize) + "/" + memstoresize + long flushableDataSize = prepareResult.totalFlushableSize.getDataSize(); + long flushableHeapSize = prepareResult.totalFlushableSize.getHeapSize(); + long memstoresize = this.memStoreSize.getDataSize(); + String msg = "Finished memstore flush." + + " Flushed data size ~" + StringUtils.byteDesc(flushableDataSize) + "/" + flushableDataSize + + " Flushed Heap size ~" + StringUtils.byteDesc(flushableHeapSize) + "/" + flushableHeapSize + + ", currentsize=" + StringUtils.byteDesc(memstoresize) + "/" + memstoresize + " for region " + this + " in " + time + "ms, sequenceid=" + flushOpSeqId + ", compaction requested=" + compactionRequested + ((wal == null) ? "; wal=null" : ""); @@ -3037,7 +3052,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return true; }); // update memStore size - region.addAndGetMemStoreSize(memStoreAccounting); + region.incMemStoreSize(memStoreAccounting); } public boolean isDone() { @@ -3806,8 +3821,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi initialized = true; } doMiniBatchMutate(batchOp); - long newSize = this.getMemStoreSize(); - requestFlushIfNeeded(newSize); + requestFlushIfNeeded(); } } finally { batchOp.closeRegionOperation(); @@ -4165,7 +4179,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // If catalog region, do not impose resource constraints or block updates. if (this.getRegionInfo().isMetaRegion()) return; - if (this.memstoreDataSize.get() > this.blockingMemStoreSize) { + if (this.memStoreSize.getHeapSize() + + this.memStoreSize.getOffHeapSize() > this.blockingMemStoreSize) { blockedRequestsCount.increment(); requestFlush(); // Don't print current limit because it will vary too much. The message is used as a key @@ -4293,8 +4308,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @param size * @return True if size is over the flush threshold */ - private boolean isFlushSize(final long size) { - return size > this.memstoreFlushSize; + private boolean isFlushSize(MemStoreSize size) { + return size.getHeapSize() + size.getOffHeapSize() > getMemStoreFlushSize(); } /** @@ -4585,7 +4600,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi rsAccounting.addRegionReplayEditsSize(getRegionInfo().getRegionName(), memstoreSize); } - flush = isFlushSize(this.addAndGetMemStoreSize(memstoreSize)); + incMemStoreSize(memstoreSize); + flush = isFlushSize(this.memStoreSize); if (flush) { internalFlushcache(null, currentEditSeqId, stores.values(), status, false, FlushLifeCycleTracker.DUMMY); @@ -6522,7 +6538,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi scannerContext.incrementBatchProgress(results.size()); for (Cell cell : results) { scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell), - PrivateCellUtil.estimatedHeapSizeOf(cell)); + PrivateCellUtil.estimatedSizeOfCell(cell)); } } @@ -7264,7 +7280,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return null; } ClientProtos.RegionLoadStats.Builder stats = ClientProtos.RegionLoadStats.newBuilder(); - stats.setMemStoreLoad((int) (Math.min(100, (this.memstoreDataSize.get() * 100) / this + stats.setMemStoreLoad((int) (Math.min(100, (this.memStoreSize.getHeapSize() * 100) / this .memstoreFlushSize))); if (rsServices.getHeapMemoryManager() != null) { // the HeapMemoryManager uses -0.0 to signal a problem asking the JVM, @@ -7412,8 +7428,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } finally { closeRegionOperation(); if (!mutations.isEmpty()) { - long newSize = this.addAndGetMemStoreSize(memstoreAccounting); - requestFlushIfNeeded(newSize); + this.incMemStoreSize(memstoreAccounting); + requestFlushIfNeeded(); } } } @@ -7566,9 +7582,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi rowLock.release(); } // Request a cache flush if over the limit. Do it outside update lock. - if (isFlushSize(addAndGetMemStoreSize(memstoreAccounting))) { - requestFlush(); - } + incMemStoreSize(memstoreAccounting); + requestFlushIfNeeded(); closeRegionOperation(op); if (this.metricsRegion != null) { switch (op) { @@ -7894,7 +7909,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.OBJECT + // closeLock (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing - (4 * ClassSize.ATOMIC_LONG) + // memStoreSize, numPutsWithoutWAL, dataInMemoryWithoutWAL, + (3 * ClassSize.ATOMIC_LONG) + // numPutsWithoutWAL, dataInMemoryWithoutWAL, // compactionsFailed (2 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, scannerReadPoints WriteState.HEAP_SIZE + // writestate @@ -7935,8 +7950,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc); if (coprocessorServiceHandlers.containsKey(serviceName)) { LOG.error("Coprocessor service " + serviceName + - " already registered, rejecting request from " + instance - ); + " already registered, rejecting request from " + instance); return false; } @@ -8211,8 +8225,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi dataInMemoryWithoutWAL.add(mutationSize); } - private void lock(final Lock lock) - throws RegionTooBusyException, InterruptedIOException { + private void lock(final Lock lock) throws RegionTooBusyException, InterruptedIOException { lock(lock, 1); } @@ -8401,6 +8414,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return this.memstoreFlushSize; } + //// method for debugging tests void throwException(String title, String regionName) { StringBuilder buf = new StringBuilder(); @@ -8416,7 +8430,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } buf.append("end-of-stores"); buf.append(", memstore size "); - buf.append(getMemStoreSize()); + buf.append(getMemStoreDataSize()); if (getRegionInfo().getRegionNameAsString().startsWith(regionName)) { throw new RuntimeException(buf.toString()); } @@ -8447,8 +8461,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi RpcServer.getRequestUser().orElse(null)); } - private void requestFlushIfNeeded(long memstoreTotalSize) throws RegionTooBusyException { - if (memstoreTotalSize > this.getMemStoreFlushSize()) { + private void requestFlushIfNeeded() throws RegionTooBusyException { + if(isFlushSize(memStoreSize)) { requestFlush(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 8d82916..c39a9af 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1600,7 +1600,7 @@ public class HRegionServer extends HasThread implements int storefiles = 0; int storeUncompressedSizeMB = 0; int storefileSizeMB = 0; - int memstoreSizeMB = (int) (r.getMemStoreSize() / 1024 / 1024); + int memstoreSizeMB = (int) (r.getMemStoreDataSize() / 1024 / 1024); long storefileIndexSizeKB = 0; int rootLevelIndexSizeKB = 0; int totalStaticIndexSizeKB = 0; @@ -2732,11 +2732,11 @@ public class HRegionServer extends HasThread implements } /** - * @return A new Map of online regions sorted by region size with the first entry being the - * biggest. If two regions are the same size, then the last one found wins; i.e. this method - * may NOT return all regions. + * @return A new Map of online regions sorted by region off-heap size with the first entry being + * the biggest. If two regions are the same size, then the last one found wins; i.e. this + * method may NOT return all regions. */ - SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedBySize() { + SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedByOffHeapSize() { // we'll sort the regions in reverse SortedMap<Long, HRegion> sortedRegions = new TreeMap<>( new Comparator<Long>() { @@ -2747,7 +2747,28 @@ public class HRegionServer extends HasThread implements }); // Copy over all regions. Regions are sorted by size with biggest first. for (HRegion region : this.onlineRegions.values()) { - sortedRegions.put(region.getMemStoreSize(), region); + sortedRegions.put(region.getMemStoreOffHeapSize(), region); + } + return sortedRegions; + } + + /** + * @return A new Map of online regions sorted by region heap size with the first entry being the + * biggest. If two regions are the same size, then the last one found wins; i.e. this method + * may NOT return all regions. + */ + SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedByOnHeapSize() { + // we'll sort the regions in reverse + SortedMap<Long, HRegion> sortedRegions = new TreeMap<>( + new Comparator<Long>() { + @Override + public int compare(Long a, Long b) { + return -1 * a.compareTo(b); + } + }); + // Copy over all regions. Regions are sorted by size with biggest first. + for (HRegion region : this.onlineRegions.values()) { + sortedRegions.put(region.getMemStoreHeapSize(), region); } return sortedRegions; } http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index f283a65..bef50b4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -2188,7 +2188,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat this.cacheFlushCount = snapshot.getCellsCount(); this.cacheFlushSize = snapshot.getDataSize(); committedFiles = new ArrayList<>(1); - return new MemStoreSize(snapshot.getDataSize(), snapshot.getHeapSize()); + return new MemStoreSize(snapshot.getMemStoreSize()); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java index 871f526..71648a0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java @@ -133,4 +133,16 @@ public class ImmutableMemStoreLAB implements MemStoreLAB { checkAndCloseMSLABs(count); } } + + @Override + public boolean isOnHeap() { + return !isOffHeap(); + } + + @Override + public boolean isOffHeap() { + return ChunkCreator.getInstance().isOffheap(); + } + + } http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java index c899eab..b781aab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java @@ -48,6 +48,10 @@ public abstract class ImmutableSegment extends Segment { super(comparator, TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC)); } + protected ImmutableSegment(CellComparator comparator, List<ImmutableSegment> segments) { + super(comparator, segments, TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC)); + } + /**------------------------------------------------------------------------ * C-tor to be used to build the derived classes */ http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index d7c7c5a..6e4191e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -87,6 +87,8 @@ class MemStoreFlusher implements FlushRequester { private final FlushHandler[] flushHandlers; private List<FlushRequestListener> flushRequestListeners = new ArrayList<>(1); + private FlushType flushType; + /** * @param conf * @param server @@ -116,6 +118,10 @@ class MemStoreFlusher implements FlushRequester { return this.updatesBlockedMsHighWater; } + public void setFlushType(FlushType flushType) { + this.flushType = flushType; + } + /** * The memstore across all regions has exceeded the low water mark. Pick * one region to flush and flush it synchronously (this is called from the @@ -123,7 +129,17 @@ class MemStoreFlusher implements FlushRequester { * @return true if successful */ private boolean flushOneForGlobalPressure() { - SortedMap<Long, HRegion> regionsBySize = server.getCopyOfOnlineRegionsSortedBySize(); + SortedMap<Long, HRegion> regionsBySize = null; + switch(flushType) { + case ABOVE_OFFHEAP_HIGHER_MARK: + case ABOVE_OFFHEAP_LOWER_MARK: + regionsBySize = server.getCopyOfOnlineRegionsSortedByOffHeapSize(); + break; + case ABOVE_ONHEAP_HIGHER_MARK: + case ABOVE_ONHEAP_LOWER_MARK: + default: + regionsBySize = server.getCopyOfOnlineRegionsSortedByOnHeapSize(); + } Set<HRegion> excludedRegions = new HashSet<>(); double secondaryMultiplier @@ -147,8 +163,25 @@ class MemStoreFlusher implements FlushRequester { } HRegion regionToFlush; + long bestAnyRegionSize; + long bestFlushableRegionSize; + switch(flushType) { + case ABOVE_OFFHEAP_HIGHER_MARK: + case ABOVE_OFFHEAP_LOWER_MARK: + bestAnyRegionSize = bestAnyRegion.getMemStoreOffHeapSize(); + bestFlushableRegionSize = bestFlushableRegion.getMemStoreOffHeapSize(); + break; + case ABOVE_ONHEAP_HIGHER_MARK: + case ABOVE_ONHEAP_LOWER_MARK: + bestAnyRegionSize = bestAnyRegion.getMemStoreHeapSize(); + bestFlushableRegionSize = bestFlushableRegion.getMemStoreHeapSize(); + break; + default: + bestAnyRegionSize = bestAnyRegion.getMemStoreDataSize(); + bestFlushableRegionSize = bestFlushableRegion.getMemStoreDataSize(); + } if (bestFlushableRegion != null && - bestAnyRegion.getMemStoreSize() > 2 * bestFlushableRegion.getMemStoreSize()) { + bestAnyRegionSize > 2 * bestFlushableRegionSize) { // Even if it's not supposed to be flushed, pick a region if it's more than twice // as big as the best flushable one - otherwise when we're under pressure we make // lots of little flushes and cause lots of compactions, etc, which just makes @@ -157,9 +190,10 @@ class MemStoreFlusher implements FlushRequester { LOG.debug("Under global heap pressure: " + "Region " + bestAnyRegion.getRegionInfo().getRegionNameAsString() + " has too many " + "store files, but is " - + TraditionalBinaryPrefix.long2String(bestAnyRegion.getMemStoreSize(), "", 1) + + TraditionalBinaryPrefix.long2String(bestAnyRegionSize, "", 1) + " vs best flushable region's " - + TraditionalBinaryPrefix.long2String(bestFlushableRegion.getMemStoreSize(), "", 1) + + TraditionalBinaryPrefix.long2String( + bestFlushableRegionSize, "", 1) + ". Choosing the bigger."); } regionToFlush = bestAnyRegion; @@ -171,19 +205,36 @@ class MemStoreFlusher implements FlushRequester { } } + long regionToFlushSize; + long bestRegionReplicaSize; + switch(flushType) { + case ABOVE_OFFHEAP_HIGHER_MARK: + case ABOVE_OFFHEAP_LOWER_MARK: + regionToFlushSize = regionToFlush.getMemStoreOffHeapSize(); + bestRegionReplicaSize = bestRegionReplica.getMemStoreOffHeapSize(); + break; + case ABOVE_ONHEAP_HIGHER_MARK: + case ABOVE_ONHEAP_LOWER_MARK: + regionToFlushSize = regionToFlush.getMemStoreHeapSize(); + bestRegionReplicaSize = bestRegionReplica.getMemStoreHeapSize(); + break; + default: + regionToFlushSize = regionToFlush.getMemStoreDataSize(); + bestRegionReplicaSize = bestRegionReplica.getMemStoreDataSize(); + } + Preconditions.checkState( - (regionToFlush != null && regionToFlush.getMemStoreSize() > 0) || - (bestRegionReplica != null && bestRegionReplica.getMemStoreSize() > 0)); + (regionToFlush != null && regionToFlushSize > 0) || + (bestRegionReplica != null && bestRegionReplicaSize > 0)); if (regionToFlush == null || (bestRegionReplica != null && ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) && - (bestRegionReplica.getMemStoreSize() - > secondaryMultiplier * regionToFlush.getMemStoreSize()))) { + (bestRegionReplicaSize > secondaryMultiplier * regionToFlushSize))) { LOG.info("Refreshing storefiles of region " + bestRegionReplica + - " due to global heap pressure. Total memstore datasize=" + + " due to global heap pressure. Total memstore off heap size=" + TraditionalBinaryPrefix.long2String( - server.getRegionServerAccounting().getGlobalMemStoreDataSize(), "", 1) + + server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) + " memstore heap size=" + TraditionalBinaryPrefix.long2String( server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1)); flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica); @@ -194,11 +245,15 @@ class MemStoreFlusher implements FlushRequester { } } else { LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. " + - "Total Memstore size=" + + "Flush type=" + flushType.toString() + + "Total Memstore Heap size=" + TraditionalBinaryPrefix.long2String( - server.getRegionServerAccounting().getGlobalMemStoreDataSize(), "", 1) + + server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1) + + "Total Memstore Off-Heap size=" + + TraditionalBinaryPrefix.long2String( + server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) + ", Region memstore size=" + - TraditionalBinaryPrefix.long2String(regionToFlush.getMemStoreSize(), "", 1)); + TraditionalBinaryPrefix.long2String(regionToFlushSize, "", 1)); flushedOne = flushRegion(regionToFlush, true, false, FlushLifeCycleTracker.DUMMY); if (!flushedOne) { @@ -582,6 +637,7 @@ class MemStoreFlusher implements FlushRequester { try { flushType = isAboveHighWaterMark(); while (flushType != FlushType.NORMAL && !server.isStopped()) { + server.cacheFlusher.setFlushType(flushType); if (!blocked) { startTime = EnvironmentEdgeManager.currentTime(); if (!server.getRegionServerAccounting().isOffheap()) { @@ -592,7 +648,7 @@ class MemStoreFlusher implements FlushRequester { switch (flushType) { case ABOVE_OFFHEAP_HIGHER_MARK: logMsg("the global offheap memstore datasize", - server.getRegionServerAccounting().getGlobalMemStoreDataSize(), + server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), server.getRegionServerAccounting().getGlobalMemStoreLimit()); break; case ABOVE_ONHEAP_HIGHER_MARK: @@ -633,8 +689,12 @@ class MemStoreFlusher implements FlushRequester { LOG.info("Unblocking updates for server " + server.toString()); } } - } else if (isAboveLowWaterMark() != FlushType.NORMAL) { - wakeupFlushThread(); + } else { + flushType = isAboveLowWaterMark(); + if (flushType != FlushType.NORMAL) { + server.cacheFlusher.setFlushType(flushType); + wakeupFlushThread(); + } } if(scope!= null) { scope.close(); http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java index 6bc8886..8b77981 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java @@ -114,7 +114,7 @@ public interface MemStoreLAB { */ Chunk getNewExternalJumboChunk(int size); - public static MemStoreLAB newInstance(Configuration conf) { + static MemStoreLAB newInstance(Configuration conf) { MemStoreLAB memStoreLAB = null; if (isEnabled(conf)) { String className = conf.get(MSLAB_CLASS_NAME, MemStoreLABImpl.class.getName()); @@ -124,7 +124,11 @@ public interface MemStoreLAB { return memStoreLAB; } - public static boolean isEnabled(Configuration conf) { + static boolean isEnabled(Configuration conf) { return conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT); } + + boolean isOnHeap(); + + boolean isOffHeap(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java index f7728ac..4ff0480 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java @@ -317,6 +317,16 @@ public class MemStoreLABImpl implements MemStoreLAB { return c; } + @Override + public boolean isOnHeap() { + return !isOffHeap(); + } + + @Override + public boolean isOffHeap() { + return this.chunkCreator.isOffheap(); + } + @VisibleForTesting Chunk getCurrentChunk() { return this.curChunk.get(); http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSize.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSize.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSize.java index 557a61a..382e6e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSize.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSize.java @@ -27,29 +27,58 @@ import org.apache.yetus.audience.InterfaceAudience; */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) public class MemStoreSize { + // MemStore size tracks 3 sizes: + // (1) data size: the aggregated size of all key-value not including meta data such as + // index, time range etc. + // (2) heap size: the aggregated size of all data that is allocated on-heap including all + // key-values that reside on-heap and the metadata that resides on-heap + // (3) off-heap size: the aggregated size of all data that is allocated off-heap including all + // key-values that reside off-heap and the metadata that resides off-heap + // + // 3 examples to illustrate their usage: + // Consider a store with 100MB of key-values allocated on-heap and 20MB of metadata allocated + // on-heap. The counters are <100MB, 120MB, 0>, respectively. + // Consider a store with 100MB of key-values allocated off-heap and 20MB of metadata + // allocated on-heap (e.g, CAM index). The counters are <100MB, 20MB, 100MB>, respectively. + // Consider a store with 100MB of key-values from which 95MB are allocated off-heap and 5MB + // are allocated on-heap (e.g., due to upserts) and 20MB of metadata from which 15MB allocated + // off-heap (e.g, CCM index) and 5MB allocated on-heap (e.g, CSLM index in active). + // The counters are <100MB, 10MB, 110MB>, respectively. + /** *'dataSize' tracks the Cell's data bytes size alone (Key bytes, value bytes). A cell's data can * be in on heap or off heap area depending on the MSLAB and its configuration to be using on heap * or off heap LABs */ - protected long dataSize; + protected volatile long dataSize; /** 'heapSize' tracks all Cell's heap size occupancy. This will include Cell POJO heap overhead. * When Cells in on heap area, this will include the cells data size as well. */ - protected long heapSize; + protected volatile long heapSize; + + /** off-heap size: the aggregated size of all data that is allocated off-heap including all + * key-values that reside off-heap and the metadata that resides off-heap + */ + protected volatile long offHeapSize; public MemStoreSize() { - this(0L, 0L); + this(0L, 0L, 0L); } - public MemStoreSize(long dataSize, long heapSize) { + public MemStoreSize(long dataSize, long heapSize, long offHeapSize) { this.dataSize = dataSize; this.heapSize = heapSize; + this.offHeapSize = offHeapSize; } + protected MemStoreSize(MemStoreSize memStoreSize) { + this.dataSize = memStoreSize.dataSize; + this.heapSize = memStoreSize.heapSize; + this.offHeapSize = memStoreSize.offHeapSize; + } public boolean isEmpty() { - return this.dataSize == 0 && this.heapSize == 0; + return this.dataSize == 0 && this.heapSize == 0 && this.offHeapSize == 0; } public long getDataSize() { @@ -60,24 +89,33 @@ public class MemStoreSize { return this.heapSize; } + public long getOffHeapSize() { + return this.offHeapSize; + } + @Override public boolean equals(Object obj) { if (obj == null || getClass() != obj.getClass()) { return false; } MemStoreSize other = (MemStoreSize) obj; - return this.dataSize == other.dataSize && this.heapSize == other.heapSize; + return this.dataSize == other.dataSize + && this.heapSize == other.heapSize + && this.offHeapSize == other.offHeapSize; } @Override public int hashCode() { long h = 13 * this.dataSize; h = h + 14 * this.heapSize; + h = h + 15 * this.offHeapSize; return (int) h; } @Override public String toString() { - return "dataSize=" + this.dataSize + " , heapSize=" + this.heapSize; + return "dataSize=" + this.dataSize + + " , heapSize=" + this.heapSize + + " , offHeapSize=" + this.offHeapSize; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java index b13201d..0b3e925 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java @@ -28,23 +28,14 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public class MemStoreSizing extends MemStoreSize { public static final MemStoreSizing DUD = new MemStoreSizing() { - @Override - public void incMemStoreSize(MemStoreSize delta) { - incMemStoreSize(delta.getDataSize(), delta.getHeapSize()); - } - @Override - public void incMemStoreSize(long dataSizeDelta, long heapSizeDelta) { + @Override public void incMemStoreSize(long dataSizeDelta, long heapSizeDelta, + long offHeapSizeDelta) { throw new RuntimeException("I'm a dud, you can't use me!"); } - @Override - public void decMemStoreSize(MemStoreSize delta) { - decMemStoreSize(delta.getDataSize(), delta.getHeapSize()); - } - - @Override - public void decMemStoreSize(long dataSizeDelta, long heapSizeDelta) { + @Override public void decMemStoreSize(long dataSizeDelta, long heapSizeDelta, + long offHeapSizeDelta) { throw new RuntimeException("I'm a dud, you can't use me!"); } }; @@ -53,51 +44,38 @@ public class MemStoreSizing extends MemStoreSize { super(); } - public MemStoreSizing(long dataSize, long heapSize) { - super(dataSize, heapSize); + public MemStoreSizing(long dataSize, long heapSize, long offHeapSize) { + super(dataSize, heapSize, offHeapSize); } - public void incMemStoreSize(long dataSizeDelta, long heapSizeDelta) { + public MemStoreSizing(MemStoreSize memStoreSize) { + super(memStoreSize); + } + + public void incMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) { this.dataSize += dataSizeDelta; this.heapSize += heapSizeDelta; + this.offHeapSize += offHeapSizeDelta; } public void incMemStoreSize(MemStoreSize delta) { - incMemStoreSize(delta.getDataSize(), delta.getHeapSize()); + incMemStoreSize(delta.getDataSize(), delta.getHeapSize(), delta.getOffHeapSize()); } - public void decMemStoreSize(long dataSizeDelta, long heapSizeDelta) { + public void decMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta) { this.dataSize -= dataSizeDelta; this.heapSize -= heapSizeDelta; + this.offHeapSize -= offHeapSizeDelta; } public void decMemStoreSize(MemStoreSize delta) { - decMemStoreSize(delta.getDataSize(), delta.getHeapSize()); + decMemStoreSize(delta.getDataSize(), delta.getHeapSize(), delta.getOffHeapSize()); } public void empty() { this.dataSize = 0L; this.heapSize = 0L; + this.offHeapSize = 0L; } - @Override - public boolean equals(Object obj) { - if (obj == null || (getClass() != obj.getClass())) { - return false; - } - MemStoreSizing other = (MemStoreSizing) obj; - return this.dataSize == other.dataSize && this.heapSize == other.heapSize; - } - - @Override - public int hashCode() { - long h = 13 * this.dataSize; - h = h + 14 * this.heapSize; - return (int) h; - } - - @Override - public String toString() { - return "dataSize=" + this.dataSize + " , heapSize=" + this.heapSize; - } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java index 1a0317d..cbd60e5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java @@ -30,8 +30,7 @@ import java.util.List; public class MemStoreSnapshot implements Closeable { private final long id; private final int cellsCount; - private final long dataSize; - private final long heapSize; + private final MemStoreSize memStoreSize; private final TimeRangeTracker timeRangeTracker; private final List<KeyValueScanner> scanners; private final boolean tagsPresent; @@ -39,8 +38,7 @@ public class MemStoreSnapshot implements Closeable { public MemStoreSnapshot(long id, ImmutableSegment snapshot) { this.id = id; this.cellsCount = snapshot.getCellsCount(); - this.dataSize = snapshot.keySize(); - this.heapSize = snapshot.heapSize(); + this.memStoreSize = snapshot.getMemStoreSize(); this.timeRangeTracker = snapshot.getTimeRangeTracker(); this.scanners = snapshot.getScanners(Long.MAX_VALUE, Long.MAX_VALUE); this.tagsPresent = snapshot.isTagsPresent(); @@ -60,15 +58,12 @@ public class MemStoreSnapshot implements Closeable { return cellsCount; } - /** - * @return Total memory size occupied by this snapshot. - */ public long getDataSize() { - return dataSize; + return memStoreSize.getDataSize(); } - public long getHeapSize() { - return heapSize; + public MemStoreSize getMemStoreSize() { + return memStoreSize; } /** @@ -100,4 +95,5 @@ public class MemStoreSnapshot implements Closeable { } } } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java index 168b42e..f06f747 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java @@ -72,7 +72,7 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr for (Store store : r.getStores()) { tempStorefilesSize += store.getStorefilesSize(); } - metricsTable.setMemStoresSize(metricsTable.getMemStoresSize() + r.getMemStoreSize()); + metricsTable.setMemStoresSize(metricsTable.getMemStoresSize() + r.getMemStoreDataSize()); metricsTable.setStoreFilesSize(metricsTable.getStoreFilesSize() + tempStorefilesSize); metricsTable.setTableSize(metricsTable.getMemStoresSize() + metricsTable.getStoreFilesSize()); metricsTable.setReadRequestsCount(metricsTable.getReadRequestsCount() + r.getReadRequestsCount()); http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java index fe7bdf9..1349921 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java @@ -44,7 +44,7 @@ public class MutableSegment extends Segment { protected MutableSegment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) { super(cellSet, comparator, memStoreLAB, TimeRangeTracker.create(TimeRangeTracker.Type.SYNC)); - incSize(0,DEEP_OVERHEAD); // update the mutable segment metadata + incSize(0, DEEP_OVERHEAD, 0); // update the mutable segment metadata } /** @@ -88,9 +88,10 @@ public class MutableSegment extends Segment { // removed cell is from MSLAB or not. Will do once HBASE-16438 is in int cellLen = getCellLength(cur); long heapSize = heapSizeChange(cur, true); - this.incSize(-cellLen, -heapSize); + long offHeapSize = offHeapSizeChange(cur, true); + this.incSize(-cellLen, -heapSize, -offHeapSize); if (memStoreSizing != null) { - memStoreSizing.decMemStoreSize(cellLen, heapSize); + memStoreSizing.decMemStoreSize(cellLen, heapSize, offHeapSize); } it.remove(); } else { http://git-wip-us.apache.org/repos/asf/hbase/blob/a458d7c4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 52d01fe..27771ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -150,7 +150,21 @@ public interface Region extends ConfigurationObserver { * the memstores of this Region. Means size in bytes for key, value and tags within Cells. * It wont consider any java heap overhead for the cell objects or any other. */ - long getMemStoreSize(); + long getMemStoreDataSize(); + + /** + * @return memstore heap size for this region, in bytes. It accounts data size of cells + * added to the memstores of this Region, as well as java heap overhead for the cell + * objects or any other. + */ + long getMemStoreHeapSize(); + + /** + * @return memstore off-heap size for this region, in bytes. It accounts data size of cells + * added to the memstores of this Region, as well as overhead for the cell + * objects or any other that is allocated off-heap. + */ + long getMemStoreOffHeapSize(); /** @return the number of mutations processed bypassing the WAL */ long getNumMutationsWithoutWAL();