This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-3 by this push:
new 7987abfc4c8 HBASE-29494: Capture Scan RPC processing time and queuing
time in Scan Metrics (#7242)
7987abfc4c8 is described below
commit 7987abfc4c87e6295111de8badecf6acba607854
Author: sanjeet006py <[email protected]>
AuthorDate: Sat Sep 6 03:21:36 2025 +0530
HBASE-29494: Capture Scan RPC processing time and queuing time in Scan
Metrics (#7242)
Signed-off-by: Viraj Jasani <[email protected]>
Signed-off-by: Hari Krishna Dara <[email protected]>
---
.../client/metrics/ServerSideScanMetrics.java | 10 ++++
.../hadoop/hbase/regionserver/RSRpcServices.java | 52 ++++++++++-------
.../hadoop/hbase/regionserver/ScannerContext.java | 16 +++++-
.../hadoop/hbase/client/TestTableScanMetrics.java | 67 +++++++++++++++++++++-
4 files changed, 121 insertions(+), 24 deletions(-)
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
index b888da62cfe..582f0409f4c 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
@@ -56,6 +56,8 @@ public class ServerSideScanMetrics {
currentRegionScanMetricsData.createCounter(BYTES_READ_FROM_BLOCK_CACHE_METRIC_NAME);
currentRegionScanMetricsData.createCounter(BYTES_READ_FROM_MEMSTORE_METRIC_NAME);
currentRegionScanMetricsData.createCounter(BLOCK_READ_OPS_COUNT_METRIC_NAME);
+
currentRegionScanMetricsData.createCounter(RPC_SCAN_PROCESSING_TIME_METRIC_NAME);
+
currentRegionScanMetricsData.createCounter(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME);
}
/**
@@ -77,6 +79,8 @@ public class ServerSideScanMetrics {
"BYTES_READ_FROM_BLOCK_CACHE";
public static final String BYTES_READ_FROM_MEMSTORE_METRIC_NAME =
"BYTES_READ_FROM_MEMSTORE";
public static final String BLOCK_READ_OPS_COUNT_METRIC_NAME =
"BLOCK_READ_OPS_COUNT";
+ public static final String RPC_SCAN_PROCESSING_TIME_METRIC_NAME =
"RPC_SCAN_PROCESSING_TIME";
+ public static final String RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME =
"RPC_SCAN_QUEUE_WAIT_TIME";
/**
* number of rows filtered during scan RPC
@@ -105,6 +109,12 @@ public class ServerSideScanMetrics {
public final AtomicLong blockReadOpsCount =
createCounter(BLOCK_READ_OPS_COUNT_METRIC_NAME);
+ public final AtomicLong rpcScanProcessingTime =
+ createCounter(RPC_SCAN_PROCESSING_TIME_METRIC_NAME);
+
+ public final AtomicLong rpcScanQueueWaitTime =
+ createCounter(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME);
+
/**
* Sets counter with counterName to passed in value, does nothing if counter
does not exist. If
* region level scan metrics are enabled then sets the value of counter for
the current region
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index fa0c2fd3ff1..bd232addcec 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -3316,7 +3316,8 @@ public class RSRpcServices extends
HBaseRpcServicesBase<HRegionServer>
// return whether we have more results in region.
private void scan(HBaseRpcController controller, ScanRequest request,
RegionScannerHolder rsh,
long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result>
results,
- ScanResponse.Builder builder, RpcCall rpcCall) throws IOException {
+ ScanResponse.Builder builder, RpcCall rpcCall, ServerSideScanMetrics
scanMetrics)
+ throws IOException {
HRegion region = rsh.r;
RegionScanner scanner = rsh.s;
long maxResultSize;
@@ -3369,8 +3370,6 @@ public class RSRpcServices extends
HBaseRpcServicesBase<HRegionServer>
final LimitScope timeScope =
allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS :
LimitScope.BETWEEN_ROWS;
- boolean trackMetrics = request.hasTrackScanMetrics() &&
request.getTrackScanMetrics();
-
// Configure with limits for this RPC. Set keep progress true since
size progress
// towards size limit should be kept between calls to nextRaw
ScannerContext.Builder contextBuilder =
ScannerContext.newBuilder(true);
@@ -3392,7 +3391,8 @@ public class RSRpcServices extends
HBaseRpcServicesBase<HRegionServer>
contextBuilder.setSizeLimit(sizeScope, maxCellSize, maxCellSize,
maxBlockSize);
contextBuilder.setBatchLimit(scanner.getBatch());
contextBuilder.setTimeLimit(timeScope, timeLimit);
- contextBuilder.setTrackMetrics(trackMetrics);
+ contextBuilder.setTrackMetrics(scanMetrics != null);
+ contextBuilder.setScanMetrics(scanMetrics);
ScannerContext scannerContext = contextBuilder.build();
boolean limitReached = false;
long blockBytesScannedBefore = 0;
@@ -3514,27 +3514,15 @@ public class RSRpcServices extends
HBaseRpcServicesBase<HRegionServer>
builder.setMoreResultsInRegion(moreRows);
// Check to see if the client requested that we track metrics server
side. If the
// client requested metrics, retrieve the metrics from the scanner
context.
- if (trackMetrics) {
+ if (scanMetrics != null) {
// rather than increment yet another counter in StoreScanner, just
set the value here
// from block size progress before writing into the response
- scannerContext.getMetrics().setCounter(
- ServerSideScanMetrics.BLOCK_BYTES_SCANNED_KEY_METRIC_NAME,
+
scanMetrics.setCounter(ServerSideScanMetrics.BLOCK_BYTES_SCANNED_KEY_METRIC_NAME,
scannerContext.getBlockSizeProgress());
if (rpcCall != null) {
-
scannerContext.getMetrics().setCounter(ServerSideScanMetrics.FS_READ_TIME_METRIC_NAME,
+
scanMetrics.setCounter(ServerSideScanMetrics.FS_READ_TIME_METRIC_NAME,
rpcCall.getFsReadTime());
}
- Map<String, Long> metrics =
scannerContext.getMetrics().getMetricsMap();
- ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
- NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
-
- for (Entry<String, Long> entry : metrics.entrySet()) {
- pairBuilder.setName(entry.getKey());
- pairBuilder.setValue(entry.getValue());
- metricBuilder.addMetrics(pairBuilder.build());
- }
-
- builder.setScanMetrics(metricBuilder.build());
}
}
} finally {
@@ -3671,6 +3659,8 @@ public class RSRpcServices extends
HBaseRpcServicesBase<HRegionServer>
boolean scannerClosed = false;
try {
List<Result> results = new ArrayList<>(Math.min(rows, 512));
+ boolean trackMetrics = request.hasTrackScanMetrics() &&
request.getTrackScanMetrics();
+ ServerSideScanMetrics scanMetrics = trackMetrics ? new
ServerSideScanMetrics() : null;
if (rows > 0) {
boolean done = false;
// Call coprocessor. Get region info from scanner.
@@ -3690,7 +3680,7 @@ public class RSRpcServices extends
HBaseRpcServicesBase<HRegionServer>
}
if (!done) {
scan((HBaseRpcController) controller, request, rsh,
maxQuotaResultSize, rows, limitOfRows,
- results, builder, rpcCall);
+ results, builder, rpcCall, scanMetrics);
} else {
builder.setMoreResultsInRegion(!results.isEmpty());
}
@@ -3742,6 +3732,28 @@ public class RSRpcServices extends
HBaseRpcServicesBase<HRegionServer>
throw new TimeoutIOException("Client deadline exceeded, cannot return
results");
}
+ if (scanMetrics != null) {
+ if (rpcCall != null) {
+ long rpcScanTime = EnvironmentEdgeManager.currentTime() -
rpcCall.getStartTime();
+ long rpcQueueWaitTime = rpcCall.getStartTime() -
rpcCall.getReceiveTime();
+
scanMetrics.addToCounter(ServerSideScanMetrics.RPC_SCAN_PROCESSING_TIME_METRIC_NAME,
+ rpcScanTime);
+
scanMetrics.addToCounter(ServerSideScanMetrics.RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME,
+ rpcQueueWaitTime);
+ }
+ Map<String, Long> metrics = scanMetrics.getMetricsMap();
+ ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
+ NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
+
+ for (Entry<String, Long> entry : metrics.entrySet()) {
+ pairBuilder.setName(entry.getKey());
+ pairBuilder.setValue(entry.getValue());
+ metricBuilder.addMetrics(pairBuilder.build());
+ }
+
+ builder.setScanMetrics(metricBuilder.build());
+ }
+
return builder.build();
} catch (IOException e) {
try {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
index 03d84f209b0..c681e91c615 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
@@ -124,6 +124,11 @@ public class ScannerContext {
final ServerSideScanMetrics metrics;
ScannerContext(boolean keepProgress, LimitFields limitsToCopy, boolean
trackMetrics) {
+ this(keepProgress, limitsToCopy, trackMetrics, null);
+ }
+
+ ScannerContext(boolean keepProgress, LimitFields limitsToCopy, boolean
trackMetrics,
+ ServerSideScanMetrics scanMetrics) {
this.limits = new LimitFields();
if (limitsToCopy != null) {
this.limits.copy(limitsToCopy);
@@ -134,7 +139,8 @@ public class ScannerContext {
this.keepProgress = keepProgress;
this.scannerState = DEFAULT_STATE;
- this.metrics = trackMetrics ? new ServerSideScanMetrics() : null;
+ this.metrics =
+ trackMetrics ? (scanMetrics != null ? scanMetrics : new
ServerSideScanMetrics()) : null;
}
public boolean isTrackingMetrics() {
@@ -417,6 +423,7 @@ public class ScannerContext {
boolean keepProgress = DEFAULT_KEEP_PROGRESS;
boolean trackMetrics = false;
LimitFields limits = new LimitFields();
+ ServerSideScanMetrics scanMetrics = null;
private Builder() {
}
@@ -455,8 +462,13 @@ public class ScannerContext {
return this;
}
+ public Builder setScanMetrics(ServerSideScanMetrics scanMetrics) {
+ this.scanMetrics = scanMetrics;
+ return this;
+ }
+
public ScannerContext build() {
- return new ScannerContext(keepProgress, limits, trackMetrics);
+ return new ScannerContext(keepProgress, limits, trackMetrics,
scanMetrics);
}
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java
index bcbf625f1e1..c5186ad79c3 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java
@@ -21,6 +21,8 @@ import static
org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
import static
org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
import static
org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_RETRIES_METRIC_NAME;
import static
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
+import static
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.RPC_SCAN_PROCESSING_TIME_METRIC_NAME;
+import static
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
@@ -34,16 +36,20 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
import org.apache.hadoop.hbase.testclassification.ClientTests;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.junit.AfterClass;
@@ -55,7 +61,7 @@ import org.junit.experimental.categories.Category;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
-@Category({ ClientTests.class, MediumTests.class })
+@Category({ ClientTests.class, LargeTests.class })
public class TestTableScanMetrics extends FromClientSideBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
@@ -330,6 +336,8 @@ public class TestTableScanMetrics extends
FromClientSideBase {
.entrySet()) {
ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
Map<String, Long> metricsMap = entry.getValue();
+ metricsMap.remove(RPC_SCAN_PROCESSING_TIME_METRIC_NAME);
+ metricsMap.remove(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME);
Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
Assert.assertEquals(1, (long)
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
@@ -347,6 +355,59 @@ public class TestTableScanMetrics extends
FromClientSideBase {
}
}
+ @Test
+ public void testRPCCallProcessingAndQueueWaitTimeMetrics() throws Exception {
+ final int numThreads = 20;
+ Configuration conf = TEST_UTIL.getConfiguration();
+ // Handler count is 3 by default.
+ int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
+ HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
+ // Keep the number of threads to be high enough for RPC calls to queue up.
For now going with 6
+ // times the handler count.
+ Assert.assertTrue(numThreads > 6 * handlerCount);
+ ThreadPoolExecutor executor = (ThreadPoolExecutor)
Executors.newFixedThreadPool(numThreads);
+ TableName tableName = TableName.valueOf(
+ TestTableScanMetrics.class.getSimpleName() +
"_testRPCCallProcessingAndQueueWaitTimeMetrics");
+ AtomicLong totalScanRpcTime = new AtomicLong(0);
+ AtomicLong totalQueueWaitTime = new AtomicLong(0);
+ CountDownLatch latch = new CountDownLatch(numThreads);
+ try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
+ TEST_UTIL.loadTable(table, CF);
+ for (int i = 0; i < numThreads; i++) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
+ scan.setEnableScanMetricsByRegion(true);
+ scan.setCaching(2);
+ try (ResultScanner rs = table.getScanner(scan)) {
+ Result r;
+ while ((r = rs.next()) != null) {
+ Assert.assertFalse(r.isEmpty());
+ }
+ ScanMetrics scanMetrics = rs.getScanMetrics();
+ Map<String, Long> metricsMap = scanMetrics.getMetricsMap();
+
totalScanRpcTime.addAndGet(metricsMap.get(RPC_SCAN_PROCESSING_TIME_METRIC_NAME));
+
totalQueueWaitTime.addAndGet(metricsMap.get(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME));
+ }
+ latch.countDown();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ }
+ latch.await();
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+ Assert.assertTrue(totalScanRpcTime.get() > 0);
+ Assert.assertTrue(totalQueueWaitTime.get() > 0);
+ } finally {
+ TEST_UTIL.deleteTable(tableName);
+ }
+ }
+
@Test
public void testScanMetricsByRegionWithRegionMove() throws Exception {
TableName tableName = TableName.valueOf(
@@ -578,6 +639,8 @@ public class TestTableScanMetrics extends
FromClientSideBase {
for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
srcMap.entrySet()) {
ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
Map<String, Long> metricsMap = entry.getValue();
+ metricsMap.remove(RPC_SCAN_PROCESSING_TIME_METRIC_NAME);
+ metricsMap.remove(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME);
if (dstMap.containsKey(scanMetricsRegionInfo)) {
Map<String, Long> dstMetricsMap = dstMap.get(scanMetricsRegionInfo);
for (Map.Entry<String, Long> metricEntry : metricsMap.entrySet()) {