This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new bac3198e599 HBASE-27853 Add client side table metrics for rpc calls 
and request latency. (#5228)
bac3198e599 is described below

commit bac3198e5996b5ebcca6fcfec75fe3f466b9d38d
Author: Fantasy-Jay <13631435...@163.com>
AuthorDate: Wed Sep 13 02:16:42 2023 +0800

    HBASE-27853 Add client side table metrics for rpc calls and request 
latency. (#5228)
    
    Signed-off-by: Bryan Beaudreault <bbeaudrea...@apache.org>
---
 .../hbase/client/AsyncBatchRpcRetryingCaller.java  |   2 +-
 .../hadoop/hbase/client/AsyncConnectionImpl.java   |   4 +-
 .../hbase/client/AsyncRpcRetryingCaller.java       |   2 +-
 .../AsyncScanSingleRegionRpcRetryingCaller.java    |   6 +-
 .../hadoop/hbase/client/ConnectionUtils.java       |   6 +-
 .../hadoop/hbase/client/MetricsConnection.java     |  70 ++++++--
 .../apache/hadoop/hbase/ipc/AbstractRpcClient.java |   2 +-
 .../hbase/ipc/DelegatingHBaseRpcController.java    |  10 ++
 .../hadoop/hbase/ipc/HBaseRpcController.java       |  10 ++
 .../hadoop/hbase/ipc/HBaseRpcControllerImpl.java   |  13 ++
 .../hadoop/hbase/client/TestMetricsConnection.java | 199 +++++++++++++++------
 .../hbase/client/TestClientTableMetrics.java       | 148 +++++++++++++++
 12 files changed, 400 insertions(+), 72 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
index c485a0a2c05..4b28d4cd4e2 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -395,7 +395,7 @@ class AsyncBatchRpcRetryingCaller<T> {
     }
     HBaseRpcController controller = conn.rpcControllerFactory.newController();
     resetController(controller, Math.min(rpcTimeoutNs, remainingNs),
-      calcPriority(serverReq.getPriority(), tableName));
+      calcPriority(serverReq.getPriority(), tableName), tableName);
     controller.setRequestAttributes(requestAttributes);
     if (!cells.isEmpty()) {
       controller.setCellScanner(createCellScanner(cells));
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 4900581c69a..3f0e3e0b370 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -144,8 +144,8 @@ public class AsyncConnectionImpl implements AsyncConnection 
{
     this.connConf = new AsyncConnectionConfiguration(conf);
     this.registry = registry;
     if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
-      this.metrics =
-        Optional.of(MetricsConnection.getMetricsConnection(metricsScope, () -> 
null, () -> null));
+      this.metrics = Optional
+        .of(MetricsConnection.getMetricsConnection(conf, metricsScope, () -> 
null, () -> null));
     } else {
       this.metrics = Optional.empty();
     }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
index c3dd8740854..32da6eedd10 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
@@ -121,7 +121,7 @@ public abstract class AsyncRpcRetryingCaller<T> {
     } else {
       callTimeoutNs = rpcTimeoutNs;
     }
-    resetController(controller, callTimeoutNs, priority);
+    resetController(controller, callTimeoutNs, priority, 
getTableName().orElse(null));
   }
 
   private void tryScheduleRetry(Throwable error) {
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index a5d4ef6407e..7e3c4340947 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -354,7 +354,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
 
   private void closeScanner() {
     incRPCCallsMetrics(scanMetrics, regionServerRemote);
-    resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS);
+    resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS, 
loc.getRegion().getTable());
     ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, 
true, false);
     stub.scan(controller, req, resp -> {
       if (controller.failed()) {
@@ -574,7 +574,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     if (tries > 1) {
       incRPCRetriesMetrics(scanMetrics, regionServerRemote);
     }
-    resetController(controller, callTimeoutNs, priority);
+    resetController(controller, callTimeoutNs, priority, 
loc.getRegion().getTable());
     ScanRequest req = RequestConverter.buildScanRequest(scannerId, 
scan.getCaching(), false,
       nextCallSeq, scan.isScanMetricsEnabled(), false, scan.getLimit());
     final Context context = Context.current();
@@ -596,7 +596,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
   private void renewLease() {
     incRPCCallsMetrics(scanMetrics, regionServerRemote);
     nextCallSeq++;
-    resetController(controller, rpcTimeoutNs, priority);
+    resetController(controller, rpcTimeoutNs, priority, 
loc.getRegion().getTable());
     ScanRequest req =
       RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq, 
false, true, -1);
     stub.scan(controller, req, resp -> {
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 4732da6f04e..4827708a02e 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -196,13 +196,17 @@ public final class ConnectionUtils {
     return Bytes.equals(row, EMPTY_END_ROW);
   }
 
-  static void resetController(HBaseRpcController controller, long timeoutNs, 
int priority) {
+  static void resetController(HBaseRpcController controller, long timeoutNs, 
int priority,
+    TableName tableName) {
     controller.reset();
     if (timeoutNs >= 0) {
       controller.setCallTimeout(
         (int) Math.min(Integer.MAX_VALUE, 
TimeUnit.NANOSECONDS.toMillis(timeoutNs)));
     }
     controller.setPriority(priority);
+    if (tableName != null) {
+      controller.setTableName(tableName);
+    }
   }
 
   static Throwable translateException(Throwable t) {
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
index 8a299dc4e5c..d4edf018d6d 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
@@ -34,8 +34,10 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -51,10 +53,10 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationPr
  * This class is for maintaining the various connection statistics and 
publishing them through the
  * metrics interfaces. This class manages its own {@link MetricRegistry} and 
{@link JmxReporter} so
  * as to not conflict with other uses of Yammer Metrics within the client 
application. Calling
- * {@link #getMetricsConnection(String, Supplier, Supplier)} implicitly 
creates and "starts"
- * instances of these classes; be sure to call {@link 
#deleteMetricsConnection(String)} to terminate
- * the thread pools they allocate. The metrics reporter will be shutdown 
{@link #shutdown()} when
- * all connections within this metrics instances are closed.
+ * {@link #getMetricsConnection(Configuration, String, Supplier, Supplier)} 
implicitly creates and
+ * "starts" instances of these classes; be sure to call {@link 
#deleteMetricsConnection(String)} to
+ * terminate the thread pools they allocate. The metrics reporter will be 
shutdown
+ * {@link #shutdown()} when all connections within this metrics instances are 
closed.
  */
 @InterfaceAudience.Private
 public final class MetricsConnection implements StatisticTrackable {
@@ -62,11 +64,11 @@ public final class MetricsConnection implements 
StatisticTrackable {
   private static final ConcurrentMap<String, MetricsConnection> 
METRICS_INSTANCES =
     new ConcurrentHashMap<>();
 
-  static MetricsConnection getMetricsConnection(final String scope,
+  static MetricsConnection getMetricsConnection(final Configuration conf, 
final String scope,
     Supplier<ThreadPoolExecutor> batchPool, Supplier<ThreadPoolExecutor> 
metaPool) {
     return METRICS_INSTANCES.compute(scope, (s, metricsConnection) -> {
       if (metricsConnection == null) {
-        MetricsConnection newMetricsConn = new MetricsConnection(scope, 
batchPool, metaPool);
+        MetricsConnection newMetricsConn = new MetricsConnection(conf, scope, 
batchPool, metaPool);
         newMetricsConn.incrConnectionCount();
         return newMetricsConn;
       } else {
@@ -91,6 +93,10 @@ public final class MetricsConnection implements 
StatisticTrackable {
   /** Set this key to {@code true} to enable metrics collection of client 
requests. */
   public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = 
"hbase.client.metrics.enable";
 
+  /** Set this key to {@code true} to enable table metrics collection of 
client requests. */
+  public static final String CLIENT_SIDE_TABLE_METRICS_ENABLED_KEY =
+    "hbase.client.table.metrics.enable";
+
   /**
    * Set to specify a custom scope for the metrics published through {@link 
MetricsConnection}. The
    * scope is added to JMX MBean objectName, and defaults to a combination of 
the Connection's
@@ -311,6 +317,7 @@ public final class MetricsConnection implements 
StatisticTrackable {
   private final MetricRegistry registry;
   private final JmxReporter reporter;
   private final String scope;
+  private final boolean tableMetricsEnabled;
 
   private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() {
     @Override
@@ -374,9 +381,10 @@ public final class MetricsConnection implements 
StatisticTrackable {
   private final ConcurrentMap<String, Counter> rpcCounters =
     new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
 
-  private MetricsConnection(String scope, Supplier<ThreadPoolExecutor> 
batchPool,
-    Supplier<ThreadPoolExecutor> metaPool) {
+  private MetricsConnection(Configuration conf, String scope,
+    Supplier<ThreadPoolExecutor> batchPool, Supplier<ThreadPoolExecutor> 
metaPool) {
     this.scope = scope;
+    this.tableMetricsEnabled = 
conf.getBoolean(CLIENT_SIDE_TABLE_METRICS_ENABLED_KEY, false);
     addThreadPools(batchPool, metaPool);
     this.registry = new MetricRegistry();
     this.registry.register(getExecutorPoolName(), new RatioGauge() {
@@ -506,6 +514,16 @@ public final class MetricsConnection implements 
StatisticTrackable {
     return rpcCounters;
   }
 
+  /** rpcTimers metric */
+  public ConcurrentMap<String, Timer> getRpcTimers() {
+    return rpcTimers;
+  }
+
+  /** rpcHistograms metric */
+  public ConcurrentMap<String, Histogram> getRpcHistograms() {
+    return rpcHistograms;
+  }
+
   /** getTracker metric */
   public CallTracker getGetTracker() {
     return getTracker;
@@ -646,7 +664,8 @@ public final class MetricsConnection implements 
StatisticTrackable {
   }
 
   /** Report RPC context to metrics system. */
-  public void updateRpc(MethodDescriptor method, Message param, CallStats 
stats, Throwable e) {
+  public void updateRpc(MethodDescriptor method, TableName tableName, Message 
param,
+    CallStats stats, Throwable e) {
     int callsPerServer = stats.getConcurrentCallsPerServer();
     if (callsPerServer > 0) {
       concurrentCallsPerServerHist.update(callsPerServer);
@@ -696,6 +715,7 @@ public final class MetricsConnection implements 
StatisticTrackable {
         case 0:
           assert "Get".equals(method.getName());
           getTracker.updateRpc(stats);
+          updateTableMetric(methodName.toString(), tableName, stats, e);
           return;
         case 1:
           assert "Mutate".equals(method.getName());
@@ -703,22 +723,25 @@ public final class MetricsConnection implements 
StatisticTrackable {
           switch (mutationType) {
             case APPEND:
               appendTracker.updateRpc(stats);
-              return;
+              break;
             case DELETE:
               deleteTracker.updateRpc(stats);
-              return;
+              break;
             case INCREMENT:
               incrementTracker.updateRpc(stats);
-              return;
+              break;
             case PUT:
               putTracker.updateRpc(stats);
-              return;
+              break;
             default:
               throw new RuntimeException("Unrecognized mutation type " + 
mutationType);
           }
+          updateTableMetric(methodName.toString(), tableName, stats, e);
+          return;
         case 2:
           assert "Scan".equals(method.getName());
           scanTracker.updateRpc(stats);
+          updateTableMetric(methodName.toString(), tableName, stats, e);
           return;
         case 3:
           assert "BulkLoadHFile".equals(method.getName());
@@ -744,6 +767,7 @@ public final class MetricsConnection implements 
StatisticTrackable {
           assert "Multi".equals(method.getName());
           numActionsPerServerHist.update(stats.getNumActionsPerServer());
           multiTracker.updateRpc(stats);
+          updateTableMetric(methodName.toString(), tableName, stats, e);
           return;
         default:
           throw new RuntimeException("Unrecognized ClientService RPC type " + 
method.getFullName());
@@ -753,6 +777,26 @@ public final class MetricsConnection implements 
StatisticTrackable {
     updateRpcGeneric(methodName.toString(), stats);
   }
 
+  /** Report table rpc context to metrics system. */
+  private void updateTableMetric(String methodName, TableName tableName, 
CallStats stats,
+    Throwable e) {
+    if (tableMetricsEnabled) {
+      if (methodName != null) {
+        String table = tableName != null && 
StringUtils.isNotEmpty(tableName.getNameAsString())
+          ? tableName.getNameAsString()
+          : "unknown";
+        String metricKey = methodName + "_" + table;
+        // update table rpc context to metrics system,
+        // includes rpc call duration, rpc call request/response size(bytes).
+        updateRpcGeneric(metricKey, stats);
+        if (e != null) {
+          // rpc failure call counter with table name.
+          getMetric(FAILURE_CNT_BASE + metricKey, rpcCounters, 
counterFactory).inc();
+        }
+      }
+    }
+  }
+
   public void incrCacheDroppingExceptions(Object exception) {
     getMetric(
       CACHE_BASE + (exception == null ? UNKNOWN_EXCEPTION : 
exception.getClass().getSimpleName()),
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index 5e42558671b..fcded9f5b69 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -379,7 +379,7 @@ public abstract class AbstractRpcClient<T extends 
RpcConnection> implements RpcC
     RpcCallback<Message> callback) {
     call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - 
call.getStartTime());
     if (metrics != null) {
-      metrics.updateRpc(call.md, call.param, call.callStats, call.error);
+      metrics.updateRpc(call.md, hrc.getTableName(), call.param, 
call.callStats, call.error);
     }
     if (LOG.isTraceEnabled()) {
       LOG.trace("CallId: {}, call: {}, startTime: {}ms, callTime: {}ms, 
status: {}", call.id,
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java
index c752f4c1835..2b8839bf846 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java
@@ -143,4 +143,14 @@ public class DelegatingHBaseRpcController implements 
HBaseRpcController {
     throws IOException {
     delegate.notifyOnCancel(callback, action);
   }
+
+  @Override
+  public void setTableName(TableName tableName) {
+    delegate.setTableName(tableName);
+  }
+
+  @Override
+  public TableName getTableName() {
+    return delegate.getTableName();
+  }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
index cd303a5eda7..4d3e038bb5e 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
@@ -130,4 +130,14 @@ public interface HBaseRpcController extends RpcController, 
CellScannable {
   default RegionInfo getRegionInfo() {
     return null;
   }
+
+  /** Sets Region's table name. */
+  default void setTableName(TableName tableName) {
+
+  }
+
+  /** Returns Region's table name or null if not available or pertinent. */
+  default TableName getTableName() {
+    return null;
+  }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
index 425c5e77afc..54e9310b5ae 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
@@ -51,6 +51,8 @@ public class HBaseRpcControllerImpl implements 
HBaseRpcController {
 
   private IOException exception;
 
+  private TableName tableName;
+
   /**
    * Rpc target Region's RegionInfo we are going against. May be null.
    * @see #hasRegionInfo()
@@ -144,6 +146,7 @@ public class HBaseRpcControllerImpl implements 
HBaseRpcController {
     exception = null;
     callTimeout = null;
     regionInfo = null;
+    tableName = null;
     // In the implementations of some callable with replicas, rpc calls are 
executed in a executor
     // and we could cancel the operation from outside which means there could 
be a race between
     // reset and startCancel. Although I think the race should be handled by 
the callable since the
@@ -273,4 +276,14 @@ public class HBaseRpcControllerImpl implements 
HBaseRpcController {
       action.run(false);
     }
   }
+
+  @Override
+  public void setTableName(TableName tableName) {
+    this.tableName = tableName;
+  }
+
+  @Override
+  public TableName getTableName() {
+    return tableName;
+  }
 }
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java
index 2afdc7ee558..e0d18f6bbb7 100644
--- 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java
@@ -18,19 +18,23 @@
 package org.apache.hadoop.hbase.client;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import com.codahale.metrics.Counter;
 import com.codahale.metrics.RatioGauge;
 import com.codahale.metrics.RatioGauge.Ratio;
+import com.codahale.metrics.Timer;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ipc.CallTimeoutException;
 import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
 import org.apache.hadoop.hbase.security.User;
@@ -38,15 +42,20 @@ import 
org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MetricsTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
 
 import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
@@ -56,25 +65,37 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanReques
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
 
+@RunWith(Parameterized.class)
 @Category({ ClientTests.class, MetricsTests.class, SmallTests.class })
 public class TestMetricsConnection {
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestMetricsConnection.class);
 
+  private static final Configuration conf = new Configuration();
   private static MetricsConnection METRICS;
   private static final ThreadPoolExecutor BATCH_POOL =
     (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
 
   private static final String MOCK_CONN_STR = "mocked-connection";
 
-  @BeforeClass
-  public static void beforeClass() {
-    METRICS = MetricsConnection.getMetricsConnection(MOCK_CONN_STR, () -> 
BATCH_POOL, () -> null);
+  @Parameter()
+  public boolean tableMetricsEnabled;
+
+  @Parameters
+  public static List<Boolean> params() {
+    return Arrays.asList(false, true);
+  }
+
+  @Before
+  public void before() {
+    conf.setBoolean(MetricsConnection.CLIENT_SIDE_TABLE_METRICS_ENABLED_KEY, 
tableMetricsEnabled);
+    METRICS =
+      MetricsConnection.getMetricsConnection(conf, MOCK_CONN_STR, () -> 
BATCH_POOL, () -> null);
   }
 
-  @AfterClass
-  public static void afterClass() {
+  @After
+  public void after() {
     MetricsConnection.deleteMetricsConnection(MOCK_CONN_STR);
   }
 
@@ -146,35 +167,52 @@ public class TestMetricsConnection {
   @Test
   public void testStaticMetrics() throws IOException {
     final byte[] foo = Bytes.toBytes("foo");
-    final RegionSpecifier region = 
RegionSpecifier.newBuilder().setValue(ByteString.EMPTY)
-      .setType(RegionSpecifierType.REGION_NAME).build();
+    String table = "TableX";
+    final RegionSpecifier region = RegionSpecifier.newBuilder()
+      
.setValue(ByteString.copyFromUtf8(table)).setType(RegionSpecifierType.REGION_NAME).build();
     final int loop = 5;
 
     for (int i = 0; i < loop; i++) {
       METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Get"),
-        GetRequest.getDefaultInstance(), MetricsConnection.newCallStats(), 
null);
+        TableName.valueOf(table),
+        
GetRequest.newBuilder().setRegion(region).setGet(ProtobufUtil.toGet(new 
Get(foo))).build(),
+        MetricsConnection.newCallStats(), null);
       METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Scan"),
-        ScanRequest.getDefaultInstance(), MetricsConnection.newCallStats(),
+        TableName.valueOf(table),
+        ScanRequest.newBuilder().setRegion(region)
+          .setScan(ProtobufUtil.toScan(new Scan(new Get(foo)))).build(),
+        MetricsConnection.newCallStats(),
         new RemoteWithExtrasException("java.io.IOException", null, false, 
false));
       
METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Multi"),
-        MultiRequest.getDefaultInstance(), MetricsConnection.newCallStats(),
+        TableName.valueOf(table),
+        MultiRequest.newBuilder()
+          .addRegionAction(ClientProtos.RegionAction.newBuilder()
+            .addAction(
+              ClientProtos.Action.newBuilder().setGet(ProtobufUtil.toGet(new 
Get(foo))).build())
+            .setRegion(region).build())
+          .build(),
+        MetricsConnection.newCallStats(),
         new CallTimeoutException("test with CallTimeoutException"));
       
METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"),
+        TableName.valueOf(table),
         MutateRequest.newBuilder()
           .setMutation(ProtobufUtil.toMutation(MutationType.APPEND, new 
Append(foo)))
           .setRegion(region).build(),
         MetricsConnection.newCallStats(), null);
       
METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"),
+        TableName.valueOf(table),
         MutateRequest.newBuilder()
           .setMutation(ProtobufUtil.toMutation(MutationType.DELETE, new 
Delete(foo)))
           .setRegion(region).build(),
         MetricsConnection.newCallStats(), null);
       
METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"),
+        TableName.valueOf(table),
         MutateRequest.newBuilder()
           .setMutation(ProtobufUtil.toMutation(MutationType.INCREMENT, new 
Increment(foo)))
           .setRegion(region).build(),
         MetricsConnection.newCallStats(), null);
       
METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"),
+        TableName.valueOf(table),
         MutateRequest.newBuilder()
           .setMutation(ProtobufUtil.toMutation(MutationType.PUT, new 
Put(foo))).setRegion(region)
           .build(),
@@ -182,48 +220,12 @@ public class TestMetricsConnection {
         new CallTimeoutException("test with CallTimeoutException"));
     }
 
-    final String rpcCountPrefix = "rpcCount_" + 
ClientService.getDescriptor().getName() + "_";
-    final String rpcFailureCountPrefix =
-      "rpcFailureCount_" + ClientService.getDescriptor().getName() + "_";
+    testRpcCallMetrics(table, loop);
+
     String metricKey;
     long metricVal;
     Counter counter;
 
-    for (String method : new String[] { "Get", "Scan", "Multi" }) {
-      metricKey = rpcCountPrefix + method;
-      metricVal = METRICS.getRpcCounters().get(metricKey).getCount();
-      assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, 
loop);
-
-      metricKey = rpcFailureCountPrefix + method;
-      counter = METRICS.getRpcCounters().get(metricKey);
-      metricVal = (counter != null) ? counter.getCount() : 0;
-      if (method.equals("Get")) {
-        // no failure
-        assertEquals("metric: " + metricKey + " val: " + metricVal, 0, 
metricVal);
-      } else {
-        // has failure
-        assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, 
loop);
-      }
-    }
-
-    String method = "Mutate";
-    for (String mutationType : new String[] { "Append", "Delete", "Increment", 
"Put" }) {
-      metricKey = rpcCountPrefix + method + "(" + mutationType + ")";
-      metricVal = METRICS.getRpcCounters().get(metricKey).getCount();
-      assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, 
loop);
-
-      metricKey = rpcFailureCountPrefix + method + "(" + mutationType + ")";
-      counter = METRICS.getRpcCounters().get(metricKey);
-      metricVal = (counter != null) ? counter.getCount() : 0;
-      if (mutationType.equals("Put")) {
-        // has failure
-        assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, 
loop);
-      } else {
-        // no failure
-        assertEquals("metric: " + metricKey + " val: " + metricVal, 0, 
metricVal);
-      }
-    }
-
     // remote exception
     metricKey = "rpcRemoteExceptions_IOException";
     counter = METRICS.getRpcCounters().get(metricKey);
@@ -242,6 +244,8 @@ public class TestMetricsConnection {
     metricVal = (counter != null) ? counter.getCount() : 0;
     assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, 
loop * 3);
 
+    testRpcCallTableMetrics(table, loop);
+
     for (MetricsConnection.CallTracker t : new MetricsConnection.CallTracker[] 
{
       METRICS.getGetTracker(), METRICS.getScanTracker(), 
METRICS.getMultiTracker(),
       METRICS.getAppendTracker(), METRICS.getDeleteTracker(), 
METRICS.getIncrementTracker(),
@@ -257,4 +261,99 @@ public class TestMetricsConnection {
     assertEquals(Ratio.of(0, 3).getValue(), executorMetrics.getValue(), 0);
     assertEquals(Double.NaN, metaMetrics.getValue(), 0);
   }
+
+  private void testRpcCallTableMetrics(String table, int expectedVal) {
+    String metricKey;
+    Timer timer;
+    String numOpsSuffix = "_num_ops";
+    String p95Suffix = "_95th_percentile";
+    String p99Suffix = "_99th_percentile";
+    String service = ClientService.getDescriptor().getName();
+    for (String m : new String[] { "Get", "Scan", "Multi" }) {
+      metricKey = "rpcCallDurationMs_" + service + "_" + m + "_" + table;
+      timer = METRICS.getRpcTimers().get(metricKey);
+      if (tableMetricsEnabled) {
+        long numOps = timer.getCount();
+        double p95 = timer.getSnapshot().get95thPercentile();
+        double p99 = timer.getSnapshot().get99thPercentile();
+        assertEquals("metric: " + metricKey + numOpsSuffix + " val: " + 
numOps, expectedVal,
+          numOps);
+        assertTrue("metric: " + metricKey + p95Suffix + " val: " + p95, p95 >= 
0);
+        assertTrue("metric: " + metricKey + p99Suffix + " val: " + p99, p99 >= 
0);
+      } else {
+        assertNull(timer);
+      }
+    }
+
+    // Distinguish mutate types for mutate method.
+    String mutateMethod = "Mutate";
+    for (String mutationType : new String[] { "Append", "Delete", "Increment", 
"Put" }) {
+      metricKey = "rpcCallDurationMs_" + service + "_" + mutateMethod + "(" + 
mutationType + ")"
+        + "_" + table;
+      timer = METRICS.getRpcTimers().get(metricKey);
+      if (tableMetricsEnabled) {
+        long numOps = timer.getCount();
+        double p95 = timer.getSnapshot().get95thPercentile();
+        double p99 = timer.getSnapshot().get99thPercentile();
+        assertEquals("metric: " + metricKey + numOpsSuffix + " val: " + 
numOps, expectedVal,
+          numOps);
+        assertTrue("metric: " + metricKey + p95Suffix + " val: " + p95, p95 >= 
0);
+        assertTrue("metric: " + metricKey + p99Suffix + " val: " + p99, p99 >= 
0);
+      } else {
+        assertNull(timer);
+      }
+    }
+  }
+
+  private void testRpcCallMetrics(String table, int expectedVal) {
+    final String rpcCountPrefix = "rpcCount_" + 
ClientService.getDescriptor().getName() + "_";
+    final String rpcFailureCountPrefix =
+      "rpcFailureCount_" + ClientService.getDescriptor().getName() + "_";
+    String metricKey;
+    long metricVal;
+    Counter counter;
+
+    for (String method : new String[] { "Get", "Scan", "Multi" }) {
+      // rpc call count
+      metricKey = rpcCountPrefix + method;
+      metricVal = METRICS.getRpcCounters().get(metricKey).getCount();
+      assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, 
expectedVal);
+
+      // rpc failure call
+      metricKey = tableMetricsEnabled
+        ? rpcFailureCountPrefix + method + "_" + table
+        : rpcFailureCountPrefix + method;
+      counter = METRICS.getRpcCounters().get(metricKey);
+      metricVal = (counter != null) ? counter.getCount() : 0;
+      if (method.equals("Get")) {
+        // no failure
+        assertEquals("metric: " + metricKey + " val: " + metricVal, 0, 
metricVal);
+      } else {
+        // has failure
+        assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, 
expectedVal);
+      }
+    }
+
+    String method = "Mutate";
+    for (String mutationType : new String[] { "Append", "Delete", "Increment", 
"Put" }) {
+      // rpc call count
+      metricKey = rpcCountPrefix + method + "(" + mutationType + ")";
+      metricVal = METRICS.getRpcCounters().get(metricKey).getCount();
+      assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, 
expectedVal);
+
+      // rpc failure call
+      metricKey = tableMetricsEnabled
+        ? rpcFailureCountPrefix + method + "(" + mutationType + ")" + "_" + 
table
+        : rpcFailureCountPrefix + method + "(" + mutationType + ")";
+      counter = METRICS.getRpcCounters().get(metricKey);
+      metricVal = (counter != null) ? counter.getCount() : 0;
+      if (mutationType.equals("Put")) {
+        // has failure
+        assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, 
expectedVal);
+      } else {
+        // no failure
+        assertEquals("metric: " + metricKey + " val: " + metricVal, 0, 
metricVal);
+      }
+    }
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTableMetrics.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTableMetrics.java
new file mode 100644
index 00000000000..c0980c51256
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTableMetrics.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.codahale.metrics.Timer;
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+
+@Category(MediumTests.class)
+public class TestClientTableMetrics {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestClientTableMetrics.class);
+
+  private static HBaseTestingUtil UTIL;
+  private static Connection CONN;
+  private static MetricsConnection METRICS;
+  private static final String tableName = "table_1";
+  private static final TableName TABLE_1 = TableName.valueOf(tableName);
+  private static final byte[] FAMILY = Bytes.toBytes("f");
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
+    conf.setBoolean(MetricsConnection.CLIENT_SIDE_TABLE_METRICS_ENABLED_KEY, 
true);
+    UTIL = new HBaseTestingUtil(conf);
+    UTIL.startMiniCluster(2);
+    UTIL.createTable(TABLE_1, FAMILY);
+    UTIL.waitTableAvailable(TABLE_1);
+    CONN = UTIL.getConnection();
+    METRICS = ((AsyncConnectionImpl) 
CONN.toAsyncConnection()).getConnectionMetrics().get();
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    UTIL.deleteTableIfAny(TABLE_1);
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testGetTableMetrics() throws IOException {
+    Table table = CONN.getTable(TABLE_1);
+    table.get(new Get(Bytes.toBytes("row1")));
+    table.get(new Get(Bytes.toBytes("row2")));
+    table.get(new Get(Bytes.toBytes("row3")));
+    table.close();
+
+    String metricKey =
+      "rpcCallDurationMs_" + ClientService.getDescriptor().getName() + "_Get_" 
+ tableName;
+    verifyTableMetrics(metricKey, 3);
+  }
+
+  @Test
+  public void testMutateTableMetrics() throws IOException {
+    Table table = CONN.getTable(TABLE_1);
+    // PUT
+    Put put = new Put(Bytes.toBytes("row1"));
+    put.addColumn(FAMILY, Bytes.toBytes("name"), Bytes.toBytes("tom"));
+    table.put(put);
+    put = new Put(Bytes.toBytes("row2"));
+    put.addColumn(FAMILY, Bytes.toBytes("name"), Bytes.toBytes("jerry"));
+    table.put(put);
+    // DELETE
+    table.delete(new Delete(Bytes.toBytes("row1")));
+    table.close();
+
+    String metricKey =
+      "rpcCallDurationMs_" + ClientService.getDescriptor().getName() + 
"_Mutate(Put)_" + tableName;
+    verifyTableMetrics(metricKey, 2);
+
+    metricKey = "rpcCallDurationMs_" + ClientService.getDescriptor().getName() 
+ "_Mutate(Delete)_"
+      + tableName;
+    verifyTableMetrics(metricKey, 1);
+  }
+
+  @Test
+  public void testScanTableMetrics() throws IOException {
+    Table table = CONN.getTable(TABLE_1);
+    table.getScanner(new Scan());
+    table.close();
+
+    String metricKey =
+      "rpcCallDurationMs_" + ClientService.getDescriptor().getName() + 
"_Scan_" + tableName;
+    verifyTableMetrics(metricKey, 1);
+  }
+
+  @Test
+  public void testMultiTableMetrics() throws IOException {
+    Table table = CONN.getTable(TABLE_1);
+    table.put(Arrays.asList(
+      new Put(Bytes.toBytes("row1")).addColumn(FAMILY, Bytes.toBytes("name"), 
Bytes.toBytes("tom")),
+      new Put(Bytes.toBytes("row2")).addColumn(FAMILY, Bytes.toBytes("name"),
+        Bytes.toBytes("jerry"))));
+    table.get(Arrays.asList(new Get(Bytes.toBytes("row1")), new 
Get(Bytes.toBytes("row2"))));
+    table.close();
+
+    String metricKey =
+      "rpcCallDurationMs_" + ClientService.getDescriptor().getName() + 
"_Multi_" + tableName;
+    verifyTableMetrics(metricKey, 2);
+  }
+
+  private static void verifyTableMetrics(String metricKey, int expectedVal) {
+    String numOpsSuffix = "_num_ops";
+    String p95Suffix = "_95th_percentile";
+    String p99Suffix = "_99th_percentile";
+    Timer timer = METRICS.getRpcTimers().get(metricKey);
+    long numOps = timer.getCount();
+    double p95 = timer.getSnapshot().get95thPercentile();
+    double p99 = timer.getSnapshot().get99thPercentile();
+    assertEquals("metric: " + metricKey + numOpsSuffix + " val: " + numOps, 
expectedVal, numOps);
+    assertTrue("metric: " + metricKey + p95Suffix + " val: " + p95, p95 >= 0);
+    assertTrue("metric: " + metricKey + p99Suffix + " val: " + p99, p99 >= 0);
+  }
+}


Reply via email to