HBASE-15134 Add visibility into Flush and Compaction queues

Conflicts:
        
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c35fa2a3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c35fa2a3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c35fa2a3

Branch: refs/heads/branch-1.3
Commit: c35fa2a3e115a41b1fcb7afc533dddeefa74163d
Parents: 3e0d693
Author: Abhishek Singh Chouhan <achou...@apache.org>
Authored: Fri Jul 28 13:17:32 2017 +0530
Committer: Andrew Purtell <apurt...@apache.org>
Committed: Wed Dec 12 18:08:17 2018 -0800

----------------------------------------------------------------------
 .../hbase/regionserver/MetricsRegionSource.java |  8 +++++
 .../regionserver/MetricsRegionWrapper.java      | 24 +++++++++++++++
 .../regionserver/MetricsRegionSourceImpl.java   | 19 +++++++++++-
 .../TestMetricsRegionSourceImpl.java            | 20 ++++++++++++
 .../hbase/regionserver/CompactSplitThread.java  |  9 +++++-
 .../hadoop/hbase/regionserver/HRegion.java      | 20 +++++++++++-
 .../hbase/regionserver/MemStoreFlusher.java     |  2 ++
 .../regionserver/MetricsRegionWrapperImpl.java  | 32 ++++++++++++++++++++
 .../regionserver/MetricsRegionWrapperStub.java  | 20 ++++++++++++
 .../hbase/regionserver/TestMetricsRegion.java   | 12 ++++++++
 10 files changed, 163 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c35fa2a3/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSource.java
 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSource.java
index decf841..d5738cf 100644
--- 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSource.java
+++ 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSource.java
@@ -30,11 +30,19 @@ public interface MetricsRegionSource extends 
Comparable<MetricsRegionSource> {
   String COMPACTIONS_COMPLETED_COUNT = "compactionsCompletedCount";
   String COMPACTIONS_FAILED_COUNT = "compactionsFailedCount";
   String LAST_MAJOR_COMPACTION_AGE = "lastMajorCompactionAge";
+  String COMPACTIONS_QUEUED_COUNT = "compactionsQueuedCount";
+  String MAX_COMPACTION_QUEUE_SIZE = "maxCompactionQueueSize";
   String NUM_BYTES_COMPACTED_COUNT = "numBytesCompactedCount";
   String NUM_FILES_COMPACTED_COUNT = "numFilesCompactedCount";
+  String FLUSHES_QUEUED_COUNT = "flushesQueuedCount";
+  String MAX_FLUSH_QUEUE_SIZE = "maxFlushQueueSize";
   String COMPACTIONS_COMPLETED_DESC = "Number of compactions that have 
completed.";
   String COMPACTIONS_FAILED_DESC = "Number of compactions that have failed.";
   String LAST_MAJOR_COMPACTION_DESC = "Age of the last major compaction in 
milliseconds.";
+  String COMPACTIONS_QUEUED_DESC = "Number of compactions that are 
queued/running for this region";
+  String MAX_COMPACTION_QUEUE_DESC = "Max number of compactions queued for 
this region";
+  String FLUSHES_QUEUED_DESC = "Number flushes requested/queued for this 
region";
+  String MAX_FLUSH_QUEUE_DESC = "Max number of flushes queued for this region";
   String  NUM_BYTES_COMPACTED_DESC =
       "Sum of filesize on all files entering a finished, successful or 
aborted, compaction";
   String NUM_FILES_COMPACTED_DESC =

http://git-wip-us.apache.org/repos/asf/hbase/blob/c35fa2a3/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java
----------------------------------------------------------------------
diff --git 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java
 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java
index 9b7acd3..9a725cd 100644
--- 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java
+++ 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java
@@ -112,6 +112,30 @@ public interface MetricsRegionWrapper {
    */
   long getNumCompactionsFailed();
 
+  /**
+   * @return the total number of compactions that are currently queued(or 
being executed) at point in
+   *  time
+   */
+  long getNumCompactionsQueued();
+
+  /**
+   * @return the total number of flushes currently queued(being executed) for 
this region at point in
+   *  time
+   */
+  long getNumFlushesQueued();
+
+  /**
+   * @return the max number of compactions queued for this region
+   * Note that this metric is updated periodically and hence might miss some 
data points
+   */
+  long getMaxCompactionQueueSize();
+
+  /**
+   * @return the max number of flushes queued for this region
+   * Note that this metric is updated periodically and hence might miss some 
data points
+   */
+  long getMaxFlushQueueSize();
+
   int getRegionHashCode();
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/c35fa2a3/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java
 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java
index 924da8b..c50ffc9 100644
--- 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java
+++ 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java
@@ -265,9 +265,26 @@ public class MetricsRegionSourceImpl implements 
MetricsRegionSource {
               regionNamePrefix + MetricsRegionServerSource.WRITE_REQUEST_COUNT,
               MetricsRegionServerSource.WRITE_REQUEST_COUNT_DESC),
           this.regionWrapper.getWriteRequestCount());
-      mrb.addCounter(Interns.info(regionNamePrefix + 
MetricsRegionSource.REPLICA_ID,
+      mrb.addCounter(Interns.info(
+              regionNamePrefix + MetricsRegionSource.REPLICA_ID,
               MetricsRegionSource.REPLICA_ID_DESC),
           this.regionWrapper.getReplicaId());
+      mrb.addCounter(Interns.info(
+              regionNamePrefix + MetricsRegionSource.COMPACTIONS_QUEUED_COUNT,
+              MetricsRegionSource.COMPACTIONS_QUEUED_DESC),
+          this.regionWrapper.getNumCompactionsQueued());
+      mrb.addCounter(Interns.info(
+              regionNamePrefix + MetricsRegionSource.FLUSHES_QUEUED_COUNT,
+              MetricsRegionSource.FLUSHES_QUEUED_DESC),
+          this.regionWrapper.getNumFlushesQueued());
+      mrb.addCounter(Interns.info(
+              regionNamePrefix + MetricsRegionSource.MAX_COMPACTION_QUEUE_SIZE,
+              MetricsRegionSource.MAX_COMPACTION_QUEUE_DESC),
+          this.regionWrapper.getMaxCompactionQueueSize());
+      mrb.addCounter(Interns.info(
+              regionNamePrefix + MetricsRegionSource.MAX_FLUSH_QUEUE_SIZE,
+              MetricsRegionSource.MAX_FLUSH_QUEUE_DESC),
+          this.regionWrapper.getMaxFlushQueueSize());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/c35fa2a3/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java
 
b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java
index a0b7612..b39c467 100644
--- 
a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java
+++ 
b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java
@@ -171,5 +171,25 @@ public class TestMetricsRegionSourceImpl {
     public int getReplicaId() {
       return 0;
     }
+
+    @Override
+    public long getNumCompactionsQueued() {
+      return 0;
+    }
+
+    @Override
+    public long getNumFlushesQueued() {
+      return 0;
+    }
+
+    @Override
+    public long getMaxCompactionQueueSize() {
+      return 0;
+    }
+
+    @Override
+    public long getMaxFlushQueueSize() {
+      return 0;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c35fa2a3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
index 71a8803..e6fe9cd 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
@@ -356,6 +356,7 @@ public class CompactSplitThread implements 
CompactionRequestor, PropagatingConfi
     ThreadPoolExecutor pool = (selectNow && 
s.throttleCompaction(compaction.getRequest().getSize()))
       ? longCompactions : shortCompactions;
     pool.execute(new CompactionRunner(s, r, compaction, pool, user));
+    ((HRegion)r).incrementCompactionsQueuedCount();
     if (LOG.isDebugEnabled()) {
       String type = (pool == shortCompactions) ? "Small " : "Large ";
       LOG.debug(type + "Compaction requested: " + (selectNow ? 
compaction.toString() : "system")
@@ -498,9 +499,13 @@ public class CompactSplitThread implements 
CompactionRequestor, PropagatingConfi
         } catch (IOException ex) {
           LOG.error("Compaction selection failed " + this, ex);
           server.checkFileSystem();
+          region.decrementCompactionsQueuedCount();
           return;
         }
-        if (this.compaction == null) return; // nothing to do
+        if (this.compaction == null) {
+          region.decrementCompactionsQueuedCount();
+          return; // nothing to do
+        }
         // Now see if we are in correct pool for the size; if not, go to the 
correct one.
         // We might end up waiting for a while, so cancel the selection.
         assert this.compaction.hasSelection();
@@ -552,6 +557,7 @@ public class CompactSplitThread implements 
CompactionRequestor, PropagatingConfi
         region.reportCompactionRequestFailure();
         server.checkFileSystem();
       } finally {
+        region.decrementCompactionsQueuedCount();
         LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this);
       }
       this.compaction.getRequest().afterExecute();
@@ -562,6 +568,7 @@ public class CompactSplitThread implements 
CompactionRequestor, PropagatingConfi
       Preconditions.checkNotNull(server);
       if (server.isStopped()
           || (region.getTableDesc() != null && 
!region.getTableDesc().isCompactionEnabled())) {
+        region.decrementCompactionsQueuedCount();
         return;
       }
       doCompaction(user);

http://git-wip-us.apache.org/repos/asf/hbase/blob/c35fa2a3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index d9dbb72..74148ea 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -297,6 +297,8 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
   final AtomicLong compactionsFailed = new AtomicLong(0L);
   final AtomicLong compactionNumFilesCompacted = new AtomicLong(0L);
   final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L);
+  final AtomicLong compactionsQueued = new AtomicLong(0L);
+  final AtomicLong flushesQueued = new AtomicLong(0L);
 
   private final WAL wal;
   private final HRegionFileSystem fs;
@@ -2088,6 +2090,10 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
           coprocessorHost.postFlush();
         }
 
+        if(fs.isFlushSucceeded()) {
+          flushesQueued.set(0L);
+        }
+
         status.markComplete("Flush successful");
         return fs;
       } finally {
@@ -8141,7 +8147,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
   public static final long FIXED_OVERHEAD = ClassSize.align(
       ClassSize.OBJECT +
       ClassSize.ARRAY +
-      46 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
+      48 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
       (14 * Bytes.SIZEOF_LONG) +
       5 * Bytes.SIZEOF_BOOLEAN);
 
@@ -8723,6 +8729,18 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     compactionsFailed.incrementAndGet();
   }
 
+  public void incrementCompactionsQueuedCount() {
+    compactionsQueued.incrementAndGet();
+  }
+
+  public void decrementCompactionsQueuedCount() {
+    compactionsQueued.decrementAndGet();
+  }
+
+  public void incrementFlushesQueuedCount() {
+    flushesQueued.incrementAndGet();
+  }
+
   /**
    * Do not change this sequence id.
    * @return sequenceId

http://git-wip-us.apache.org/repos/asf/hbase/blob/c35fa2a3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index b152958..b4adea6 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -361,6 +361,7 @@ class MemStoreFlusher implements FlushRequester {
 
   @Override
   public void requestFlush(Region r, boolean forceFlushAllStores) {
+    ((HRegion)r).incrementFlushesQueuedCount();
     synchronized (regionsInQueue) {
       if (!regionsInQueue.containsKey(r)) {
         // This entry has no delay so it will be added at the top of the flush
@@ -374,6 +375,7 @@ class MemStoreFlusher implements FlushRequester {
 
   @Override
   public void requestDelayedFlush(Region r, long delay, boolean 
forceFlushAllStores) {
+    ((HRegion)r).incrementFlushesQueuedCount();
     synchronized (regionsInQueue) {
       if (!regionsInQueue.containsKey(r)) {
         // This entry has some delay

http://git-wip-us.apache.org/repos/asf/hbase/blob/c35fa2a3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
index 493ee18..60085f6 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
@@ -52,6 +52,8 @@ public class MetricsRegionWrapperImpl implements 
MetricsRegionWrapper, Closeable
   private long minStoreFileAge;
   private long avgStoreFileAge;
   private long numReferenceFiles;
+  private long maxFlushQueueSize;
+  private long maxCompactionQueueSize;
 
   private ScheduledFuture<?> regionMetricsUpdateTask;
 
@@ -158,6 +160,26 @@ public class MetricsRegionWrapperImpl implements 
MetricsRegionWrapper, Closeable
   }
 
   @Override
+  public long getNumCompactionsQueued() {
+    return this.region.compactionsQueued.get();
+  }
+
+  @Override
+  public long getNumFlushesQueued() {
+    return this.region.flushesQueued.get();
+  }
+
+  @Override
+  public long getMaxCompactionQueueSize() {
+    return maxCompactionQueueSize;
+  }
+
+  @Override
+  public long getMaxFlushQueueSize() {
+    return maxFlushQueueSize;
+  }
+
+  @Override
   public long getMaxStoreFileAge() {
     return maxStoreFileAge;
   }
@@ -192,6 +214,8 @@ public class MetricsRegionWrapperImpl implements 
MetricsRegionWrapper, Closeable
       long tempMaxStoreFileAge = 0;
       long tempMinStoreFileAge = Long.MAX_VALUE;
       long tempNumReferenceFiles = 0;
+      long tempMaxCompactionQueueSize = 0;
+      long tempMaxFlushQueueSize = 0;
 
       long avgAgeNumerator = 0;
       long numHFiles = 0;
@@ -229,6 +253,14 @@ public class MetricsRegionWrapperImpl implements 
MetricsRegionWrapper, Closeable
       }
 
       numReferenceFiles = tempNumReferenceFiles;
+      tempMaxCompactionQueueSize = getNumCompactionsQueued();
+      tempMaxFlushQueueSize = getNumFlushesQueued();
+      if (tempMaxCompactionQueueSize > maxCompactionQueueSize) {
+        maxCompactionQueueSize = tempMaxCompactionQueueSize;
+      }
+      if (tempMaxFlushQueueSize > maxFlushQueueSize) {
+        maxFlushQueueSize = tempMaxFlushQueueSize;
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/c35fa2a3/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java
index 8ae1180..4f18144 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java
@@ -137,4 +137,24 @@ public class MetricsRegionWrapperStub implements 
MetricsRegionWrapper {
   public int getReplicaId() {
     return replicaid;
   }
+
+  @Override
+  public long getNumCompactionsQueued() {
+    return 4;
+  }
+
+  @Override
+  public long getNumFlushesQueued() {
+    return 6;
+  }
+
+  @Override
+  public long getMaxCompactionQueueSize() {
+    return 4;
+  }
+
+  @Override
+  public long getMaxFlushQueueSize() {
+    return 6;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c35fa2a3/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegion.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegion.java
index febcd28..2009da9 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegion.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegion.java
@@ -76,6 +76,18 @@ public class TestMetricsRegion {
     HELPER.assertCounter(
       
"namespace_TestNS_table_MetricsRegionWrapperStub_region_DEADBEEF001_metric_replicaid",
 
       1, agg);
+    HELPER.assertCounter(
+      
"namespace_TestNS_table_MetricsRegionWrapperStub_region_DEADBEEF001_metric_compactionsQueuedCount",
+      4, agg);
+    HELPER.assertCounter(
+      
"namespace_TestNS_table_MetricsRegionWrapperStub_region_DEADBEEF001_metric_flushesQueuedCount",
+      6, agg);
+    HELPER.assertCounter(
+      
"namespace_TestNS_table_MetricsRegionWrapperStub_region_DEADBEEF001_metric_maxCompactionQueueSize",
+      4, agg);
+    HELPER.assertCounter(
+      
"namespace_TestNS_table_MetricsRegionWrapperStub_region_DEADBEEF001_metric_maxFlushQueueSize",
+      6, agg);
     mr.close();
   }
 }

Reply via email to