Repository: hbase Updated Branches: refs/heads/branch-1 e3d70db4a -> 2eb810d0f
HBASE-17627 Active workers metric for thrift (Ashu Pachauri) Signed-off-by: Gary Helmling <ga...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2eb810d0 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2eb810d0 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2eb810d0 Branch: refs/heads/branch-1 Commit: 2eb810d0f7dd79e30176e9c8fbd1a69905c9bb63 Parents: e3d70db Author: Ashu Pachauri <ashu210...@gmail.com> Authored: Tue Feb 14 13:23:02 2017 -0800 Committer: Gary Helmling <ga...@apache.org> Committed: Wed Feb 15 15:46:29 2017 -0800 ---------------------------------------------------------------------- .../hbase/thrift/MetricsThriftServerSource.java | 11 ++++ .../thrift/MetricsThriftServerSourceImpl.java | 12 ++++ .../hbase/thrift/TBoundedThreadPoolServer.java | 4 +- .../hbase/thrift/THBaseThreadPoolExecutor.java | 61 ++++++++++++++++++++ .../hadoop/hbase/thrift/ThriftMetrics.java | 8 +++ .../hadoop/hbase/thrift/ThriftServerRunner.java | 4 +- .../hadoop/hbase/thrift2/ThriftServer.java | 17 ++++-- 7 files changed, 109 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/2eb810d0/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/thrift/MetricsThriftServerSource.java ---------------------------------------------------------------------- diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/thrift/MetricsThriftServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/thrift/MetricsThriftServerSource.java index 77fb11a..276a40c 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/thrift/MetricsThriftServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/thrift/MetricsThriftServerSource.java @@ -32,6 +32,7 @@ public interface MetricsThriftServerSource extends ExceptionTrackingSource, JvmP String THRIFT_CALL_KEY = "thriftCall"; String SLOW_THRIFT_CALL_KEY = "slowThriftCall"; String CALL_QUEUE_LEN_KEY = "callQueueLen"; + String ACTIVE_WORKER_COUNT_KEY = "numActiveWorkers"; /** * Add how long an operation was in the queue. @@ -75,4 +76,14 @@ public interface MetricsThriftServerSource extends ExceptionTrackingSource, JvmP * @param time Time */ void incSlowCall(long time); + + /** + * Increment number of active thrift workers. + */ + void incActiveWorkerCount(); + + /** + * Decrement number of active thrift workers. + */ + void decActiveWorkerCount(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/2eb810d0/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/thrift/MetricsThriftServerSourceImpl.java ---------------------------------------------------------------------- diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/thrift/MetricsThriftServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/thrift/MetricsThriftServerSourceImpl.java index 27323ac..d47dbe7 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/thrift/MetricsThriftServerSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/thrift/MetricsThriftServerSourceImpl.java @@ -50,6 +50,8 @@ public class MetricsThriftServerSourceImpl extends ExceptionTrackingSourceImpl i private final MetricHistogram pausesWithGc; private final MetricHistogram pausesWithoutGc; + private MutableGaugeLong activeWorkerCountGauge; + public MetricsThriftServerSourceImpl(String metricsName, String metricsDescription, String metricsContext, @@ -74,6 +76,7 @@ public class MetricsThriftServerSourceImpl extends ExceptionTrackingSourceImpl i thriftCallStat = getMetricsRegistry().newTimeHistogram(THRIFT_CALL_KEY); thriftSlowCallStat = getMetricsRegistry().newTimeHistogram(SLOW_THRIFT_CALL_KEY); callQueueLenGauge = getMetricsRegistry().getGauge(CALL_QUEUE_LEN_KEY, 0); + activeWorkerCountGauge = getMetricsRegistry().getGauge(ACTIVE_WORKER_COUNT_KEY, 0); } @Override @@ -112,6 +115,15 @@ public class MetricsThriftServerSourceImpl extends ExceptionTrackingSourceImpl i thriftSlowCallStat.add(time); } + public void incActiveWorkerCount() { + activeWorkerCountGauge.incr(); + } + + @Override + public void decActiveWorkerCount() { + activeWorkerCountGauge.decr(); + } + @Override public void incInfoThresholdExceeded(int count) { infoPauseThresholdExceeded.incr(count); http://git-wip-us.apache.org/repos/asf/hbase/blob/2eb810d0/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java ---------------------------------------------------------------------- diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java index 84613bd..6ece753 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java @@ -153,9 +153,9 @@ public class TBoundedThreadPoolServer extends TServer { tfb.setDaemon(true); tfb.setNameFormat("thrift-worker-%d"); executorService = - new ThreadPoolExecutor(options.minWorkerThreads, + new THBaseThreadPoolExecutor(options.minWorkerThreads, options.maxWorkerThreads, options.threadKeepAliveTimeSec, - TimeUnit.SECONDS, this.callQueue, tfb.build()); + TimeUnit.SECONDS, this.callQueue, tfb.build(), metrics); serverOptions = options; } http://git-wip-us.apache.org/repos/asf/hbase/blob/2eb810d0/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/THBaseThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/THBaseThreadPoolExecutor.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/THBaseThreadPoolExecutor.java new file mode 100644 index 0000000..973cad7 --- /dev/null +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/THBaseThreadPoolExecutor.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.thrift; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * A ThreadPoolExecutor customized for working with HBase thrift to update metrics before and + * after the execution of a task. + */ + +public class THBaseThreadPoolExecutor extends ThreadPoolExecutor { + + private ThriftMetrics metrics; + + public THBaseThreadPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime, + TimeUnit unit, BlockingQueue<Runnable> workQueue, ThriftMetrics metrics) { + this(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue, null, metrics); + } + + public THBaseThreadPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime, + TimeUnit unit, BlockingQueue<Runnable> workQueue, + ThreadFactory threadFactory,ThriftMetrics metrics) { + super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue); + if (threadFactory != null) { + setThreadFactory(threadFactory); + } + this.metrics = metrics; + } + + @Override + protected void beforeExecute(Thread t, Runnable r) { + super.beforeExecute(t, r); + metrics.incActiveWorkerCount(); + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + metrics.decActiveWorkerCount(); + super.afterExecute(r, t); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2eb810d0/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java ---------------------------------------------------------------------- diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java index dc69b33..1f5dc95 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java @@ -99,6 +99,14 @@ public class ThriftMetrics { } } + public void incActiveWorkerCount() { + source.incActiveWorkerCount(); + } + + public void decActiveWorkerCount() { + source.decActiveWorkerCount(); + } + /** * Increment the count for a specific exception type. This is called for each exception type * that is returned to the thrift handler. http://git-wip-us.apache.org/repos/asf/hbase/blob/2eb810d0/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java ---------------------------------------------------------------------- diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java index 8ad387a..f3efd6f 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java @@ -620,8 +620,8 @@ public class ThriftServerRunner implements Runnable { ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); tfb.setDaemon(true); tfb.setNameFormat("thrift-worker-%d"); - return new ThreadPoolExecutor(minWorkers, maxWorkers, - Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build()); + return new THBaseThreadPoolExecutor(minWorkers, maxWorkers, + Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build(), metrics); } private InetAddress getBindAddress(Configuration conf) http://git-wip-us.apache.org/repos/asf/hbase/blob/2eb810d0/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java ---------------------------------------------------------------------- diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java index e93a390..e0d8613 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -58,6 +59,7 @@ import org.apache.hadoop.hbase.security.SecurityUtil; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.thrift.CallQueue; import org.apache.hadoop.hbase.thrift.CallQueue.Call; +import org.apache.hadoop.hbase.thrift.THBaseThreadPoolExecutor; import org.apache.hadoop.hbase.thrift.ThriftMetrics; import org.apache.hadoop.hbase.thrift2.generated.THBaseService; import org.apache.hadoop.hbase.util.DNS; @@ -312,8 +314,8 @@ public class ThriftServer { ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); tfb.setDaemon(true); tfb.setNameFormat("thrift2-worker-%d"); - ThreadPoolExecutor pool = new ThreadPoolExecutor(workerThreads, workerThreads, - Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build()); + ThreadPoolExecutor pool = new THBaseThreadPoolExecutor(workerThreads, workerThreads, + Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build(), metrics); pool.prestartAllCoreThreads(); return pool; } @@ -324,7 +326,8 @@ public class ThriftServer { int workerThreads, InetSocketAddress inetSocketAddress, int backlog, - int clientTimeout) + int clientTimeout, + ThriftMetrics metrics) throws TTransportException { TServerTransport serverTransport = new TServerSocket( new TServerSocket.ServerSocketTransportArgs(). @@ -338,6 +341,11 @@ public class ThriftServer { if (workerThreads > 0) { serverArgs.maxWorkerThreads(workerThreads); } + ThreadPoolExecutor executor = new THBaseThreadPoolExecutor(serverArgs.minWorkerThreads, + serverArgs.maxWorkerThreads, serverArgs.stopTimeoutVal, TimeUnit.SECONDS, + new SynchronousQueue<Runnable>(), metrics); + serverArgs.executorService(executor); + return new TThreadPoolServer(serverArgs); } @@ -568,7 +576,8 @@ public class ThriftServer { workerThreads, inetSocketAddress, backlog, - readTimeout); + readTimeout, + metrics); } final TServer tserver = server;