HBASE-15967 Metric for active ipc Readers and make default fraction of cpu count
Add new metric hbase.regionserver.ipc.runningReaders
Also make it so Reader count is a factor of processor count


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1125215a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1125215a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1125215a

Branch: refs/heads/hbase-12439
Commit: 1125215aad3f5b149f3458ba7019c5920f6dca66
Parents: e0b70c0
Author: stack <st...@apache.org>
Authored: Sun Jun 5 11:12:05 2016 -0700
Committer: stack <st...@apache.org>
Committed: Tue Jun 7 13:10:14 2016 -0700

----------------------------------------------------------------------
 .../hbase/ipc/MetricsHBaseServerSource.java     | 11 +++++++---
 .../hbase/ipc/MetricsHBaseServerSourceImpl.java | 19 ++++++++++++++++
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  | 23 +++++++++++++++-----
 3 files changed, 45 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/1125215a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
index ce57e0f..43515cd 100644
--- 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
+++ 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
@@ -86,6 +86,13 @@ public interface MetricsHBaseServerSource extends BaseSource 
{
   String EXCEPTIONS_MULTI_TOO_LARGE_DESC = "A response to a multi request was 
too large and the " +
       "rest of the requests will have to be retried.";
 
+  String RUNNING_READERS = "runningReaders";
+  String RUNNING_READERS_DESCRIPTION =
+      "Count of Reader threads currently busy parsing requests to hand off to 
the scheduler";
+
+  void incrRunningReaders();
+  void decrRunningReaders();
+
   void authorizationSuccess();
 
   void authorizationFailure();
@@ -122,6 +129,4 @@ public interface MetricsHBaseServerSource extends 
BaseSource {
   void processedCall(int processingTime);
 
   void queuedAndProcessedCall(int totalTime);
-
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/1125215a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
index c72641d..24cc0fb 100644
--- 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
+++ 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
@@ -57,6 +57,12 @@ public class MetricsHBaseServerSourceImpl extends 
BaseSourceImpl
   private MetricHistogram requestSize;
   private MetricHistogram responseSize;
 
+  /**
+   * The count of readers currently working parsing a request as opposed to 
being blocked on the
+   * selector waiting on requests to come in.
+   */
+  private final MutableFastCounter runningReaders;
+
   public MetricsHBaseServerSourceImpl(String metricsName,
                                       String metricsDescription,
                                       String metricsContext,
@@ -86,6 +92,9 @@ public class MetricsHBaseServerSourceImpl extends 
BaseSourceImpl
     this.exceptionsMultiTooLarge = this.getMetricsRegistry()
         .newCounter(EXCEPTIONS_MULTI_TOO_LARGE_NAME, 
EXCEPTIONS_MULTI_TOO_LARGE_DESC, 0L);
 
+    this.runningReaders = this.getMetricsRegistry()
+        .newCounter(RUNNING_READERS, RUNNING_READERS_DESCRIPTION, 0L);
+
     this.authenticationSuccesses = this.getMetricsRegistry().newCounter(
         AUTHENTICATION_SUCCESSES_NAME, AUTHENTICATION_SUCCESSES_DESC, 0L);
     this.authenticationFailures = 
this.getMetricsRegistry().newCounter(AUTHENTICATION_FAILURES_NAME,
@@ -109,6 +118,16 @@ public class MetricsHBaseServerSourceImpl extends 
BaseSourceImpl
   }
 
   @Override
+  public void incrRunningReaders() {
+    this.runningReaders.incr(+1);
+  }
+
+  @Override
+  public void decrRunningReaders() {
+    this.runningReaders.incr(-1);
+  }
+
+  @Override
   public void authorizationSuccess() {
     authorizationSuccesses.incr();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1125215a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index aca3fdd..c9d2639 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -625,7 +625,8 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
 
     public Listener(final String name) throws IOException {
       super(name);
-      // The backlog of requests that we will have the serversocket carry.
+      // The backlog of requests that we will have the serversocket carry. It 
is not enough
+      // just setting this config. You need to set the backlog in the kernel 
too.
       int backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 
128);
       // Create a new server socket and set to non blocking mode
       acceptChannel = ServerSocketChannel.open();
@@ -690,7 +691,12 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
               iter.remove();
               if (key.isValid()) {
                 if (key.isReadable()) {
-                  doRead(key);
+                  metrics.getMetricsSource().incrRunningReaders();
+                  try {
+                    doRead(key);
+                  } finally {
+                    metrics.getMetricsSource().decrRunningReaders();
+                  }
                 }
               }
               key = null;
@@ -734,8 +740,9 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
             iter.remove();
             try {
               if (key.isValid()) {
-                if (key.isAcceptable())
+                if (key.isAcceptable()) {
                   doAccept(key);
+                }
               }
             } catch (IOException ignored) {
               if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
@@ -830,7 +837,8 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
       try {
         count = c.readAndProcess();
       } catch (InterruptedException ieo) {
-        LOG.info(Thread.currentThread().getName() + ": readAndProcess caught 
InterruptedException", ieo);
+        LOG.info(Thread.currentThread().getName() +
+            ": readAndProcess caught InterruptedException", ieo);
         throw ieo;
       } catch (Exception e) {
         if (LOG.isDebugEnabled()) {
@@ -1159,6 +1167,7 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
     private ByteBuffer dataLengthBuffer;
     protected final ConcurrentLinkedDeque<Call> responseQueue = new 
ConcurrentLinkedDeque<Call>();
     private final Lock responseWriteLock = new ReentrantLock();
+    // EXPENSIVE: Counters cost lots of CPU. Remove. Used just to see if idle 
or not. Use boolean.
     private Counter rpcCount = new Counter(); // number of outstanding rpcs
     private long lastContact;
     private InetAddress addr;
@@ -2000,7 +2009,11 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
     // See declaration above for documentation on what this size is.
     this.maxQueueSizeInBytes =
       this.conf.getLong("hbase.ipc.server.max.callqueue.size", 
DEFAULT_MAX_CALLQUEUE_SIZE);
-    this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 
10);
+    // Have the Reader thread count default to 1/4 of the processors. This 
seems to do pretty
+    // well. See the metric hbase.regionserver.ipc.runningReaders to see if 
you need to change it.
+    int processors = Runtime.getRuntime().availableProcessors();
+    this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size",
+        Math.max(8, processors/ 4));
     this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
       2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
     this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, 
DEFAULT_WARN_RESPONSE_TIME);

Reply via email to