This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new 7987abfc4c8 HBASE-29494: Capture Scan RPC processing time and queuing 
time in Scan Metrics (#7242)
7987abfc4c8 is described below

commit 7987abfc4c87e6295111de8badecf6acba607854
Author: sanjeet006py <[email protected]>
AuthorDate: Sat Sep 6 03:21:36 2025 +0530

    HBASE-29494: Capture Scan RPC processing time and queuing time in Scan 
Metrics (#7242)
    
    Signed-off-by: Viraj Jasani <[email protected]>
    Signed-off-by: Hari Krishna Dara <[email protected]>
---
 .../client/metrics/ServerSideScanMetrics.java      | 10 ++++
 .../hadoop/hbase/regionserver/RSRpcServices.java   | 52 ++++++++++-------
 .../hadoop/hbase/regionserver/ScannerContext.java  | 16 +++++-
 .../hadoop/hbase/client/TestTableScanMetrics.java  | 67 +++++++++++++++++++++-
 4 files changed, 121 insertions(+), 24 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
index b888da62cfe..582f0409f4c 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
@@ -56,6 +56,8 @@ public class ServerSideScanMetrics {
     
currentRegionScanMetricsData.createCounter(BYTES_READ_FROM_BLOCK_CACHE_METRIC_NAME);
     
currentRegionScanMetricsData.createCounter(BYTES_READ_FROM_MEMSTORE_METRIC_NAME);
     
currentRegionScanMetricsData.createCounter(BLOCK_READ_OPS_COUNT_METRIC_NAME);
+    
currentRegionScanMetricsData.createCounter(RPC_SCAN_PROCESSING_TIME_METRIC_NAME);
+    
currentRegionScanMetricsData.createCounter(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME);
   }
 
   /**
@@ -77,6 +79,8 @@ public class ServerSideScanMetrics {
     "BYTES_READ_FROM_BLOCK_CACHE";
   public static final String BYTES_READ_FROM_MEMSTORE_METRIC_NAME = 
"BYTES_READ_FROM_MEMSTORE";
   public static final String BLOCK_READ_OPS_COUNT_METRIC_NAME = 
"BLOCK_READ_OPS_COUNT";
+  public static final String RPC_SCAN_PROCESSING_TIME_METRIC_NAME = 
"RPC_SCAN_PROCESSING_TIME";
+  public static final String RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME = 
"RPC_SCAN_QUEUE_WAIT_TIME";
 
   /**
    * number of rows filtered during scan RPC
@@ -105,6 +109,12 @@ public class ServerSideScanMetrics {
 
   public final AtomicLong blockReadOpsCount = 
createCounter(BLOCK_READ_OPS_COUNT_METRIC_NAME);
 
+  public final AtomicLong rpcScanProcessingTime =
+    createCounter(RPC_SCAN_PROCESSING_TIME_METRIC_NAME);
+
+  public final AtomicLong rpcScanQueueWaitTime =
+    createCounter(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME);
+
   /**
    * Sets counter with counterName to passed in value, does nothing if counter 
does not exist. If
    * region level scan metrics are enabled then sets the value of counter for 
the current region
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index fa0c2fd3ff1..bd232addcec 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -3316,7 +3316,8 @@ public class RSRpcServices extends 
HBaseRpcServicesBase<HRegionServer>
   // return whether we have more results in region.
   private void scan(HBaseRpcController controller, ScanRequest request, 
RegionScannerHolder rsh,
     long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> 
results,
-    ScanResponse.Builder builder, RpcCall rpcCall) throws IOException {
+    ScanResponse.Builder builder, RpcCall rpcCall, ServerSideScanMetrics 
scanMetrics)
+    throws IOException {
     HRegion region = rsh.r;
     RegionScanner scanner = rsh.s;
     long maxResultSize;
@@ -3369,8 +3370,6 @@ public class RSRpcServices extends 
HBaseRpcServicesBase<HRegionServer>
         final LimitScope timeScope =
           allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : 
LimitScope.BETWEEN_ROWS;
 
-        boolean trackMetrics = request.hasTrackScanMetrics() && 
request.getTrackScanMetrics();
-
         // Configure with limits for this RPC. Set keep progress true since 
size progress
         // towards size limit should be kept between calls to nextRaw
         ScannerContext.Builder contextBuilder = 
ScannerContext.newBuilder(true);
@@ -3392,7 +3391,8 @@ public class RSRpcServices extends 
HBaseRpcServicesBase<HRegionServer>
         contextBuilder.setSizeLimit(sizeScope, maxCellSize, maxCellSize, 
maxBlockSize);
         contextBuilder.setBatchLimit(scanner.getBatch());
         contextBuilder.setTimeLimit(timeScope, timeLimit);
-        contextBuilder.setTrackMetrics(trackMetrics);
+        contextBuilder.setTrackMetrics(scanMetrics != null);
+        contextBuilder.setScanMetrics(scanMetrics);
         ScannerContext scannerContext = contextBuilder.build();
         boolean limitReached = false;
         long blockBytesScannedBefore = 0;
@@ -3514,27 +3514,15 @@ public class RSRpcServices extends 
HBaseRpcServicesBase<HRegionServer>
         builder.setMoreResultsInRegion(moreRows);
         // Check to see if the client requested that we track metrics server 
side. If the
         // client requested metrics, retrieve the metrics from the scanner 
context.
-        if (trackMetrics) {
+        if (scanMetrics != null) {
           // rather than increment yet another counter in StoreScanner, just 
set the value here
           // from block size progress before writing into the response
-          scannerContext.getMetrics().setCounter(
-            ServerSideScanMetrics.BLOCK_BYTES_SCANNED_KEY_METRIC_NAME,
+          
scanMetrics.setCounter(ServerSideScanMetrics.BLOCK_BYTES_SCANNED_KEY_METRIC_NAME,
             scannerContext.getBlockSizeProgress());
           if (rpcCall != null) {
-            
scannerContext.getMetrics().setCounter(ServerSideScanMetrics.FS_READ_TIME_METRIC_NAME,
+            
scanMetrics.setCounter(ServerSideScanMetrics.FS_READ_TIME_METRIC_NAME,
               rpcCall.getFsReadTime());
           }
-          Map<String, Long> metrics = 
scannerContext.getMetrics().getMetricsMap();
-          ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
-          NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
-
-          for (Entry<String, Long> entry : metrics.entrySet()) {
-            pairBuilder.setName(entry.getKey());
-            pairBuilder.setValue(entry.getValue());
-            metricBuilder.addMetrics(pairBuilder.build());
-          }
-
-          builder.setScanMetrics(metricBuilder.build());
         }
       }
     } finally {
@@ -3671,6 +3659,8 @@ public class RSRpcServices extends 
HBaseRpcServicesBase<HRegionServer>
     boolean scannerClosed = false;
     try {
       List<Result> results = new ArrayList<>(Math.min(rows, 512));
+      boolean trackMetrics = request.hasTrackScanMetrics() && 
request.getTrackScanMetrics();
+      ServerSideScanMetrics scanMetrics = trackMetrics ? new 
ServerSideScanMetrics() : null;
       if (rows > 0) {
         boolean done = false;
         // Call coprocessor. Get region info from scanner.
@@ -3690,7 +3680,7 @@ public class RSRpcServices extends 
HBaseRpcServicesBase<HRegionServer>
         }
         if (!done) {
           scan((HBaseRpcController) controller, request, rsh, 
maxQuotaResultSize, rows, limitOfRows,
-            results, builder, rpcCall);
+            results, builder, rpcCall, scanMetrics);
         } else {
           builder.setMoreResultsInRegion(!results.isEmpty());
         }
@@ -3742,6 +3732,28 @@ public class RSRpcServices extends 
HBaseRpcServicesBase<HRegionServer>
         throw new TimeoutIOException("Client deadline exceeded, cannot return 
results");
       }
 
+      if (scanMetrics != null) {
+        if (rpcCall != null) {
+          long rpcScanTime = EnvironmentEdgeManager.currentTime() - 
rpcCall.getStartTime();
+          long rpcQueueWaitTime = rpcCall.getStartTime() - 
rpcCall.getReceiveTime();
+          
scanMetrics.addToCounter(ServerSideScanMetrics.RPC_SCAN_PROCESSING_TIME_METRIC_NAME,
+            rpcScanTime);
+          
scanMetrics.addToCounter(ServerSideScanMetrics.RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME,
+            rpcQueueWaitTime);
+        }
+        Map<String, Long> metrics = scanMetrics.getMetricsMap();
+        ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
+        NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
+
+        for (Entry<String, Long> entry : metrics.entrySet()) {
+          pairBuilder.setName(entry.getKey());
+          pairBuilder.setValue(entry.getValue());
+          metricBuilder.addMetrics(pairBuilder.build());
+        }
+
+        builder.setScanMetrics(metricBuilder.build());
+      }
+
       return builder.build();
     } catch (IOException e) {
       try {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
index 03d84f209b0..c681e91c615 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
@@ -124,6 +124,11 @@ public class ScannerContext {
   final ServerSideScanMetrics metrics;
 
   ScannerContext(boolean keepProgress, LimitFields limitsToCopy, boolean 
trackMetrics) {
+    this(keepProgress, limitsToCopy, trackMetrics, null);
+  }
+
+  ScannerContext(boolean keepProgress, LimitFields limitsToCopy, boolean 
trackMetrics,
+    ServerSideScanMetrics scanMetrics) {
     this.limits = new LimitFields();
     if (limitsToCopy != null) {
       this.limits.copy(limitsToCopy);
@@ -134,7 +139,8 @@ public class ScannerContext {
 
     this.keepProgress = keepProgress;
     this.scannerState = DEFAULT_STATE;
-    this.metrics = trackMetrics ? new ServerSideScanMetrics() : null;
+    this.metrics =
+      trackMetrics ? (scanMetrics != null ? scanMetrics : new 
ServerSideScanMetrics()) : null;
   }
 
   public boolean isTrackingMetrics() {
@@ -417,6 +423,7 @@ public class ScannerContext {
     boolean keepProgress = DEFAULT_KEEP_PROGRESS;
     boolean trackMetrics = false;
     LimitFields limits = new LimitFields();
+    ServerSideScanMetrics scanMetrics = null;
 
     private Builder() {
     }
@@ -455,8 +462,13 @@ public class ScannerContext {
       return this;
     }
 
+    public Builder setScanMetrics(ServerSideScanMetrics scanMetrics) {
+      this.scanMetrics = scanMetrics;
+      return this;
+    }
+
     public ScannerContext build() {
-      return new ScannerContext(keepProgress, limits, trackMetrics);
+      return new ScannerContext(keepProgress, limits, trackMetrics, 
scanMetrics);
     }
   }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java
index bcbf625f1e1..c5186ad79c3 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java
@@ -21,6 +21,8 @@ import static 
org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
 import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
 import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_RETRIES_METRIC_NAME;
 import static 
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
+import static 
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.RPC_SCAN_PROCESSING_TIME_METRIC_NAME;
+import static 
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME;
 import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
@@ -34,16 +36,20 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FutureUtils;
 import org.junit.AfterClass;
@@ -55,7 +61,7 @@ import org.junit.experimental.categories.Category;
 import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
 
-@Category({ ClientTests.class, MediumTests.class })
+@Category({ ClientTests.class, LargeTests.class })
 public class TestTableScanMetrics extends FromClientSideBase {
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
@@ -330,6 +336,8 @@ public class TestTableScanMetrics extends 
FromClientSideBase {
           .entrySet()) {
           ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
           Map<String, Long> metricsMap = entry.getValue();
+          metricsMap.remove(RPC_SCAN_PROCESSING_TIME_METRIC_NAME);
+          metricsMap.remove(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME);
           Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
           Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
           Assert.assertEquals(1, (long) 
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
@@ -347,6 +355,59 @@ public class TestTableScanMetrics extends 
FromClientSideBase {
     }
   }
 
+  @Test
+  public void testRPCCallProcessingAndQueueWaitTimeMetrics() throws Exception {
+    final int numThreads = 20;
+    Configuration conf = TEST_UTIL.getConfiguration();
+    // Handler count is 3 by default.
+    int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
+      HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
+    // Keep the number of threads to be high enough for RPC calls to queue up. 
For now going with 6
+    // times the handler count.
+    Assert.assertTrue(numThreads > 6 * handlerCount);
+    ThreadPoolExecutor executor = (ThreadPoolExecutor) 
Executors.newFixedThreadPool(numThreads);
+    TableName tableName = TableName.valueOf(
+      TestTableScanMetrics.class.getSimpleName() + 
"_testRPCCallProcessingAndQueueWaitTimeMetrics");
+    AtomicLong totalScanRpcTime = new AtomicLong(0);
+    AtomicLong totalQueueWaitTime = new AtomicLong(0);
+    CountDownLatch latch = new CountDownLatch(numThreads);
+    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
+      TEST_UTIL.loadTable(table, CF);
+      for (int i = 0; i < numThreads; i++) {
+        executor.execute(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
+              scan.setEnableScanMetricsByRegion(true);
+              scan.setCaching(2);
+              try (ResultScanner rs = table.getScanner(scan)) {
+                Result r;
+                while ((r = rs.next()) != null) {
+                  Assert.assertFalse(r.isEmpty());
+                }
+                ScanMetrics scanMetrics = rs.getScanMetrics();
+                Map<String, Long> metricsMap = scanMetrics.getMetricsMap();
+                
totalScanRpcTime.addAndGet(metricsMap.get(RPC_SCAN_PROCESSING_TIME_METRIC_NAME));
+                
totalQueueWaitTime.addAndGet(metricsMap.get(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME));
+              }
+              latch.countDown();
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        });
+      }
+      latch.await();
+      executor.shutdown();
+      executor.awaitTermination(10, TimeUnit.SECONDS);
+      Assert.assertTrue(totalScanRpcTime.get() > 0);
+      Assert.assertTrue(totalQueueWaitTime.get() > 0);
+    } finally {
+      TEST_UTIL.deleteTable(tableName);
+    }
+  }
+
   @Test
   public void testScanMetricsByRegionWithRegionMove() throws Exception {
     TableName tableName = TableName.valueOf(
@@ -578,6 +639,8 @@ public class TestTableScanMetrics extends 
FromClientSideBase {
     for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
srcMap.entrySet()) {
       ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
       Map<String, Long> metricsMap = entry.getValue();
+      metricsMap.remove(RPC_SCAN_PROCESSING_TIME_METRIC_NAME);
+      metricsMap.remove(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME);
       if (dstMap.containsKey(scanMetricsRegionInfo)) {
         Map<String, Long> dstMetricsMap = dstMap.get(scanMetricsRegionInfo);
         for (Map.Entry<String, Long> metricEntry : metricsMap.entrySet()) {

Reply via email to