Repository: hbase
Updated Branches:
  refs/heads/branch-1 e3d70db4a -> 2eb810d0f


HBASE-17627 Active workers metric for thrift (Ashu Pachauri)

Signed-off-by: Gary Helmling <ga...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2eb810d0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2eb810d0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2eb810d0

Branch: refs/heads/branch-1
Commit: 2eb810d0f7dd79e30176e9c8fbd1a69905c9bb63
Parents: e3d70db
Author: Ashu Pachauri <ashu210...@gmail.com>
Authored: Tue Feb 14 13:23:02 2017 -0800
Committer: Gary Helmling <ga...@apache.org>
Committed: Wed Feb 15 15:46:29 2017 -0800

----------------------------------------------------------------------
 .../hbase/thrift/MetricsThriftServerSource.java | 11 ++++
 .../thrift/MetricsThriftServerSourceImpl.java   | 12 ++++
 .../hbase/thrift/TBoundedThreadPoolServer.java  |  4 +-
 .../hbase/thrift/THBaseThreadPoolExecutor.java  | 61 ++++++++++++++++++++
 .../hadoop/hbase/thrift/ThriftMetrics.java      |  8 +++
 .../hadoop/hbase/thrift/ThriftServerRunner.java |  4 +-
 .../hadoop/hbase/thrift2/ThriftServer.java      | 17 ++++--
 7 files changed, 109 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/2eb810d0/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/thrift/MetricsThriftServerSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/thrift/MetricsThriftServerSource.java
 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/thrift/MetricsThriftServerSource.java
index 77fb11a..276a40c 100644
--- 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/thrift/MetricsThriftServerSource.java
+++ 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/thrift/MetricsThriftServerSource.java
@@ -32,6 +32,7 @@ public interface MetricsThriftServerSource extends 
ExceptionTrackingSource, JvmP
   String THRIFT_CALL_KEY = "thriftCall";
   String SLOW_THRIFT_CALL_KEY = "slowThriftCall";
   String CALL_QUEUE_LEN_KEY = "callQueueLen";
+  String ACTIVE_WORKER_COUNT_KEY = "numActiveWorkers";
 
   /**
    * Add how long an operation was in the queue.
@@ -75,4 +76,14 @@ public interface MetricsThriftServerSource extends 
ExceptionTrackingSource, JvmP
    * @param time Time
    */
   void incSlowCall(long time);
+
+  /**
+   * Increment number of active thrift workers.
+   */
+  void incActiveWorkerCount();
+
+  /**
+   * Decrement number of active thrift workers.
+   */
+  void decActiveWorkerCount();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2eb810d0/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/thrift/MetricsThriftServerSourceImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/thrift/MetricsThriftServerSourceImpl.java
 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/thrift/MetricsThriftServerSourceImpl.java
index 27323ac..d47dbe7 100644
--- 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/thrift/MetricsThriftServerSourceImpl.java
+++ 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/thrift/MetricsThriftServerSourceImpl.java
@@ -50,6 +50,8 @@ public class MetricsThriftServerSourceImpl extends 
ExceptionTrackingSourceImpl i
   private final MetricHistogram pausesWithGc;
   private final MetricHistogram pausesWithoutGc;
 
+  private MutableGaugeLong activeWorkerCountGauge;
+
   public MetricsThriftServerSourceImpl(String metricsName,
                                        String metricsDescription,
                                        String metricsContext,
@@ -74,6 +76,7 @@ public class MetricsThriftServerSourceImpl extends 
ExceptionTrackingSourceImpl i
     thriftCallStat = getMetricsRegistry().newTimeHistogram(THRIFT_CALL_KEY);
     thriftSlowCallStat = 
getMetricsRegistry().newTimeHistogram(SLOW_THRIFT_CALL_KEY);
     callQueueLenGauge = getMetricsRegistry().getGauge(CALL_QUEUE_LEN_KEY, 0);
+    activeWorkerCountGauge = 
getMetricsRegistry().getGauge(ACTIVE_WORKER_COUNT_KEY, 0);
   }
 
   @Override
@@ -112,6 +115,15 @@ public class MetricsThriftServerSourceImpl extends 
ExceptionTrackingSourceImpl i
     thriftSlowCallStat.add(time);
   }
 
+  public void incActiveWorkerCount() {
+    activeWorkerCountGauge.incr();
+  }
+
+  @Override
+  public void decActiveWorkerCount() {
+    activeWorkerCountGauge.decr();
+  }
+
   @Override
   public void incInfoThresholdExceeded(int count) {
     infoPauseThresholdExceeded.incr(count);

http://git-wip-us.apache.org/repos/asf/hbase/blob/2eb810d0/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java
 
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java
index 84613bd..6ece753 100644
--- 
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java
+++ 
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java
@@ -153,9 +153,9 @@ public class TBoundedThreadPoolServer extends TServer {
     tfb.setDaemon(true);
     tfb.setNameFormat("thrift-worker-%d");
     executorService =
-        new ThreadPoolExecutor(options.minWorkerThreads,
+        new THBaseThreadPoolExecutor(options.minWorkerThreads,
             options.maxWorkerThreads, options.threadKeepAliveTimeSec,
-            TimeUnit.SECONDS, this.callQueue, tfb.build());
+            TimeUnit.SECONDS, this.callQueue, tfb.build(), metrics);
     serverOptions = options;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/2eb810d0/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/THBaseThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git 
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/THBaseThreadPoolExecutor.java
 
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/THBaseThreadPoolExecutor.java
new file mode 100644
index 0000000..973cad7
--- /dev/null
+++ 
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/THBaseThreadPoolExecutor.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.thrift;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A ThreadPoolExecutor customized for working with HBase thrift to update 
metrics before and
+ * after the execution of a task.
+ */
+
+public class THBaseThreadPoolExecutor extends ThreadPoolExecutor {
+
+  private ThriftMetrics metrics;
+
+  public THBaseThreadPoolExecutor(int corePoolSize, int maxPoolSize, long 
keepAliveTime,
+      TimeUnit unit, BlockingQueue<Runnable> workQueue, ThriftMetrics metrics) 
{
+    this(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue, null, 
metrics);
+  }
+
+  public THBaseThreadPoolExecutor(int corePoolSize, int maxPoolSize, long 
keepAliveTime,
+      TimeUnit unit, BlockingQueue<Runnable> workQueue,
+      ThreadFactory threadFactory,ThriftMetrics metrics) {
+    super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue);
+    if (threadFactory != null) {
+      setThreadFactory(threadFactory);
+    }
+    this.metrics = metrics;
+  }
+
+  @Override
+  protected void beforeExecute(Thread t, Runnable r) {
+    super.beforeExecute(t, r);
+    metrics.incActiveWorkerCount();
+  }
+
+  @Override
+  protected void afterExecute(Runnable r, Throwable t) {
+    metrics.decActiveWorkerCount();
+    super.afterExecute(r, t);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2eb810d0/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java
----------------------------------------------------------------------
diff --git 
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java 
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java
index dc69b33..1f5dc95 100644
--- 
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java
+++ 
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java
@@ -99,6 +99,14 @@ public class ThriftMetrics  {
     }
   }
 
+  public void incActiveWorkerCount() {
+    source.incActiveWorkerCount();
+  }
+
+  public void decActiveWorkerCount() {
+    source.decActiveWorkerCount();
+  }
+
   /**
    * Increment the count for a specific exception type.  This is called for 
each exception type
    * that is returned to the thrift handler.

http://git-wip-us.apache.org/repos/asf/hbase/blob/2eb810d0/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
----------------------------------------------------------------------
diff --git 
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
 
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
index 8ad387a..f3efd6f 100644
--- 
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
+++ 
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
@@ -620,8 +620,8 @@ public class ThriftServerRunner implements Runnable {
     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
     tfb.setDaemon(true);
     tfb.setNameFormat("thrift-worker-%d");
-    return new ThreadPoolExecutor(minWorkers, maxWorkers,
-            Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
+    return new THBaseThreadPoolExecutor(minWorkers, maxWorkers,
+            Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build(), metrics);
   }
 
   private InetAddress getBindAddress(Configuration conf)

http://git-wip-us.apache.org/repos/asf/hbase/blob/2eb810d0/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java 
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
index e93a390..e0d8613 100644
--- 
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
+++ 
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
@@ -58,6 +59,7 @@ import org.apache.hadoop.hbase.security.SecurityUtil;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.thrift.CallQueue;
 import org.apache.hadoop.hbase.thrift.CallQueue.Call;
+import org.apache.hadoop.hbase.thrift.THBaseThreadPoolExecutor;
 import org.apache.hadoop.hbase.thrift.ThriftMetrics;
 import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
 import org.apache.hadoop.hbase.util.DNS;
@@ -312,8 +314,8 @@ public class ThriftServer {
     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
     tfb.setDaemon(true);
     tfb.setNameFormat("thrift2-worker-%d");
-    ThreadPoolExecutor pool = new ThreadPoolExecutor(workerThreads, 
workerThreads,
-            Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
+    ThreadPoolExecutor pool = new THBaseThreadPoolExecutor(workerThreads, 
workerThreads,
+            Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build(), metrics);
     pool.prestartAllCoreThreads();
     return pool;
   }
@@ -324,7 +326,8 @@ public class ThriftServer {
                                               int workerThreads,
                                               InetSocketAddress 
inetSocketAddress,
                                               int backlog,
-                                              int clientTimeout)
+                                              int clientTimeout,
+                                              ThriftMetrics metrics)
       throws TTransportException {
     TServerTransport serverTransport = new TServerSocket(
                                            new 
TServerSocket.ServerSocketTransportArgs().
@@ -338,6 +341,11 @@ public class ThriftServer {
     if (workerThreads > 0) {
       serverArgs.maxWorkerThreads(workerThreads);
     }
+    ThreadPoolExecutor executor = new 
THBaseThreadPoolExecutor(serverArgs.minWorkerThreads,
+        serverArgs.maxWorkerThreads, serverArgs.stopTimeoutVal, 
TimeUnit.SECONDS,
+        new SynchronousQueue<Runnable>(), metrics);
+    serverArgs.executorService(executor);
+
     return new TThreadPoolServer(serverArgs);
   }
 
@@ -568,7 +576,8 @@ public class ThriftServer {
           workerThreads,
           inetSocketAddress,
           backlog,
-          readTimeout);
+          readTimeout,
+          metrics);
     }
 
     final TServer tserver = server;

Reply via email to