HBASE-15134 Add visibility into Flush and Compaction queues Conflicts: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c35fa2a3 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c35fa2a3 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c35fa2a3 Branch: refs/heads/branch-1.3 Commit: c35fa2a3e115a41b1fcb7afc533dddeefa74163d Parents: 3e0d693 Author: Abhishek Singh Chouhan <achou...@apache.org> Authored: Fri Jul 28 13:17:32 2017 +0530 Committer: Andrew Purtell <apurt...@apache.org> Committed: Wed Dec 12 18:08:17 2018 -0800 ---------------------------------------------------------------------- .../hbase/regionserver/MetricsRegionSource.java | 8 +++++ .../regionserver/MetricsRegionWrapper.java | 24 +++++++++++++++ .../regionserver/MetricsRegionSourceImpl.java | 19 +++++++++++- .../TestMetricsRegionSourceImpl.java | 20 ++++++++++++ .../hbase/regionserver/CompactSplitThread.java | 9 +++++- .../hadoop/hbase/regionserver/HRegion.java | 20 +++++++++++- .../hbase/regionserver/MemStoreFlusher.java | 2 ++ .../regionserver/MetricsRegionWrapperImpl.java | 32 ++++++++++++++++++++ .../regionserver/MetricsRegionWrapperStub.java | 20 ++++++++++++ .../hbase/regionserver/TestMetricsRegion.java | 12 ++++++++ 10 files changed, 163 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/c35fa2a3/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSource.java ---------------------------------------------------------------------- diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSource.java index decf841..d5738cf 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSource.java @@ -30,11 +30,19 @@ public interface MetricsRegionSource extends Comparable<MetricsRegionSource> { String COMPACTIONS_COMPLETED_COUNT = "compactionsCompletedCount"; String COMPACTIONS_FAILED_COUNT = "compactionsFailedCount"; String LAST_MAJOR_COMPACTION_AGE = "lastMajorCompactionAge"; + String COMPACTIONS_QUEUED_COUNT = "compactionsQueuedCount"; + String MAX_COMPACTION_QUEUE_SIZE = "maxCompactionQueueSize"; String NUM_BYTES_COMPACTED_COUNT = "numBytesCompactedCount"; String NUM_FILES_COMPACTED_COUNT = "numFilesCompactedCount"; + String FLUSHES_QUEUED_COUNT = "flushesQueuedCount"; + String MAX_FLUSH_QUEUE_SIZE = "maxFlushQueueSize"; String COMPACTIONS_COMPLETED_DESC = "Number of compactions that have completed."; String COMPACTIONS_FAILED_DESC = "Number of compactions that have failed."; String LAST_MAJOR_COMPACTION_DESC = "Age of the last major compaction in milliseconds."; + String COMPACTIONS_QUEUED_DESC = "Number of compactions that are queued/running for this region"; + String MAX_COMPACTION_QUEUE_DESC = "Max number of compactions queued for this region"; + String FLUSHES_QUEUED_DESC = "Number flushes requested/queued for this region"; + String MAX_FLUSH_QUEUE_DESC = "Max number of flushes queued for this region"; String NUM_BYTES_COMPACTED_DESC = "Sum of filesize on all files entering a finished, successful or aborted, compaction"; String NUM_FILES_COMPACTED_DESC = http://git-wip-us.apache.org/repos/asf/hbase/blob/c35fa2a3/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java ---------------------------------------------------------------------- diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java index 9b7acd3..9a725cd 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java @@ -112,6 +112,30 @@ public interface MetricsRegionWrapper { */ long getNumCompactionsFailed(); + /** + * @return the total number of compactions that are currently queued(or being executed) at point in + * time + */ + long getNumCompactionsQueued(); + + /** + * @return the total number of flushes currently queued(being executed) for this region at point in + * time + */ + long getNumFlushesQueued(); + + /** + * @return the max number of compactions queued for this region + * Note that this metric is updated periodically and hence might miss some data points + */ + long getMaxCompactionQueueSize(); + + /** + * @return the max number of flushes queued for this region + * Note that this metric is updated periodically and hence might miss some data points + */ + long getMaxFlushQueueSize(); + int getRegionHashCode(); /** http://git-wip-us.apache.org/repos/asf/hbase/blob/c35fa2a3/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java ---------------------------------------------------------------------- diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java index 924da8b..c50ffc9 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java @@ -265,9 +265,26 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource { regionNamePrefix + MetricsRegionServerSource.WRITE_REQUEST_COUNT, MetricsRegionServerSource.WRITE_REQUEST_COUNT_DESC), this.regionWrapper.getWriteRequestCount()); - mrb.addCounter(Interns.info(regionNamePrefix + MetricsRegionSource.REPLICA_ID, + mrb.addCounter(Interns.info( + regionNamePrefix + MetricsRegionSource.REPLICA_ID, MetricsRegionSource.REPLICA_ID_DESC), this.regionWrapper.getReplicaId()); + mrb.addCounter(Interns.info( + regionNamePrefix + MetricsRegionSource.COMPACTIONS_QUEUED_COUNT, + MetricsRegionSource.COMPACTIONS_QUEUED_DESC), + this.regionWrapper.getNumCompactionsQueued()); + mrb.addCounter(Interns.info( + regionNamePrefix + MetricsRegionSource.FLUSHES_QUEUED_COUNT, + MetricsRegionSource.FLUSHES_QUEUED_DESC), + this.regionWrapper.getNumFlushesQueued()); + mrb.addCounter(Interns.info( + regionNamePrefix + MetricsRegionSource.MAX_COMPACTION_QUEUE_SIZE, + MetricsRegionSource.MAX_COMPACTION_QUEUE_DESC), + this.regionWrapper.getMaxCompactionQueueSize()); + mrb.addCounter(Interns.info( + regionNamePrefix + MetricsRegionSource.MAX_FLUSH_QUEUE_SIZE, + MetricsRegionSource.MAX_FLUSH_QUEUE_DESC), + this.regionWrapper.getMaxFlushQueueSize()); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c35fa2a3/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java ---------------------------------------------------------------------- diff --git a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java index a0b7612..b39c467 100644 --- a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java +++ b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java @@ -171,5 +171,25 @@ public class TestMetricsRegionSourceImpl { public int getReplicaId() { return 0; } + + @Override + public long getNumCompactionsQueued() { + return 0; + } + + @Override + public long getNumFlushesQueued() { + return 0; + } + + @Override + public long getMaxCompactionQueueSize() { + return 0; + } + + @Override + public long getMaxFlushQueueSize() { + return 0; + } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c35fa2a3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 71a8803..e6fe9cd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -356,6 +356,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize())) ? longCompactions : shortCompactions; pool.execute(new CompactionRunner(s, r, compaction, pool, user)); + ((HRegion)r).incrementCompactionsQueuedCount(); if (LOG.isDebugEnabled()) { String type = (pool == shortCompactions) ? "Small " : "Large "; LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system") @@ -498,9 +499,13 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi } catch (IOException ex) { LOG.error("Compaction selection failed " + this, ex); server.checkFileSystem(); + region.decrementCompactionsQueuedCount(); return; } - if (this.compaction == null) return; // nothing to do + if (this.compaction == null) { + region.decrementCompactionsQueuedCount(); + return; // nothing to do + } // Now see if we are in correct pool for the size; if not, go to the correct one. // We might end up waiting for a while, so cancel the selection. assert this.compaction.hasSelection(); @@ -552,6 +557,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi region.reportCompactionRequestFailure(); server.checkFileSystem(); } finally { + region.decrementCompactionsQueuedCount(); LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this); } this.compaction.getRequest().afterExecute(); @@ -562,6 +568,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi Preconditions.checkNotNull(server); if (server.isStopped() || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) { + region.decrementCompactionsQueuedCount(); return; } doCompaction(user); http://git-wip-us.apache.org/repos/asf/hbase/blob/c35fa2a3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index d9dbb72..74148ea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -297,6 +297,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi final AtomicLong compactionsFailed = new AtomicLong(0L); final AtomicLong compactionNumFilesCompacted = new AtomicLong(0L); final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L); + final AtomicLong compactionsQueued = new AtomicLong(0L); + final AtomicLong flushesQueued = new AtomicLong(0L); private final WAL wal; private final HRegionFileSystem fs; @@ -2088,6 +2090,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi coprocessorHost.postFlush(); } + if(fs.isFlushSucceeded()) { + flushesQueued.set(0L); + } + status.markComplete("Flush successful"); return fs; } finally { @@ -8141,7 +8147,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 46 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + + 48 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + (14 * Bytes.SIZEOF_LONG) + 5 * Bytes.SIZEOF_BOOLEAN); @@ -8723,6 +8729,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi compactionsFailed.incrementAndGet(); } + public void incrementCompactionsQueuedCount() { + compactionsQueued.incrementAndGet(); + } + + public void decrementCompactionsQueuedCount() { + compactionsQueued.decrementAndGet(); + } + + public void incrementFlushesQueuedCount() { + flushesQueued.incrementAndGet(); + } + /** * Do not change this sequence id. * @return sequenceId http://git-wip-us.apache.org/repos/asf/hbase/blob/c35fa2a3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index b152958..b4adea6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -361,6 +361,7 @@ class MemStoreFlusher implements FlushRequester { @Override public void requestFlush(Region r, boolean forceFlushAllStores) { + ((HRegion)r).incrementFlushesQueuedCount(); synchronized (regionsInQueue) { if (!regionsInQueue.containsKey(r)) { // This entry has no delay so it will be added at the top of the flush @@ -374,6 +375,7 @@ class MemStoreFlusher implements FlushRequester { @Override public void requestDelayedFlush(Region r, long delay, boolean forceFlushAllStores) { + ((HRegion)r).incrementFlushesQueuedCount(); synchronized (regionsInQueue) { if (!regionsInQueue.containsKey(r)) { // This entry has some delay http://git-wip-us.apache.org/repos/asf/hbase/blob/c35fa2a3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java index 493ee18..60085f6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java @@ -52,6 +52,8 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable private long minStoreFileAge; private long avgStoreFileAge; private long numReferenceFiles; + private long maxFlushQueueSize; + private long maxCompactionQueueSize; private ScheduledFuture<?> regionMetricsUpdateTask; @@ -158,6 +160,26 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable } @Override + public long getNumCompactionsQueued() { + return this.region.compactionsQueued.get(); + } + + @Override + public long getNumFlushesQueued() { + return this.region.flushesQueued.get(); + } + + @Override + public long getMaxCompactionQueueSize() { + return maxCompactionQueueSize; + } + + @Override + public long getMaxFlushQueueSize() { + return maxFlushQueueSize; + } + + @Override public long getMaxStoreFileAge() { return maxStoreFileAge; } @@ -192,6 +214,8 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable long tempMaxStoreFileAge = 0; long tempMinStoreFileAge = Long.MAX_VALUE; long tempNumReferenceFiles = 0; + long tempMaxCompactionQueueSize = 0; + long tempMaxFlushQueueSize = 0; long avgAgeNumerator = 0; long numHFiles = 0; @@ -229,6 +253,14 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable } numReferenceFiles = tempNumReferenceFiles; + tempMaxCompactionQueueSize = getNumCompactionsQueued(); + tempMaxFlushQueueSize = getNumFlushesQueued(); + if (tempMaxCompactionQueueSize > maxCompactionQueueSize) { + maxCompactionQueueSize = tempMaxCompactionQueueSize; + } + if (tempMaxFlushQueueSize > maxFlushQueueSize) { + maxFlushQueueSize = tempMaxFlushQueueSize; + } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c35fa2a3/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java index 8ae1180..4f18144 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java @@ -137,4 +137,24 @@ public class MetricsRegionWrapperStub implements MetricsRegionWrapper { public int getReplicaId() { return replicaid; } + + @Override + public long getNumCompactionsQueued() { + return 4; + } + + @Override + public long getNumFlushesQueued() { + return 6; + } + + @Override + public long getMaxCompactionQueueSize() { + return 4; + } + + @Override + public long getMaxFlushQueueSize() { + return 6; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c35fa2a3/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegion.java index febcd28..2009da9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegion.java @@ -76,6 +76,18 @@ public class TestMetricsRegion { HELPER.assertCounter( "namespace_TestNS_table_MetricsRegionWrapperStub_region_DEADBEEF001_metric_replicaid", 1, agg); + HELPER.assertCounter( + "namespace_TestNS_table_MetricsRegionWrapperStub_region_DEADBEEF001_metric_compactionsQueuedCount", + 4, agg); + HELPER.assertCounter( + "namespace_TestNS_table_MetricsRegionWrapperStub_region_DEADBEEF001_metric_flushesQueuedCount", + 6, agg); + HELPER.assertCounter( + "namespace_TestNS_table_MetricsRegionWrapperStub_region_DEADBEEF001_metric_maxCompactionQueueSize", + 4, agg); + HELPER.assertCounter( + "namespace_TestNS_table_MetricsRegionWrapperStub_region_DEADBEEF001_metric_maxFlushQueueSize", + 6, agg); mr.close(); } }