This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new 8ac9040355e HBASE-29834 MILLIS_BETWEEN_NEXTS metric is not updated on
branch-3+ (#7660) (#7850)
8ac9040355e is described below
commit 8ac9040355ede51bd4f5f029b66f8650230cc60f
Author: Duo Zhang <[email protected]>
AuthorDate: Thu Mar 5 22:33:14 2026 +0800
HBASE-29834 MILLIS_BETWEEN_NEXTS metric is not updated on branch-3+ (#7660)
(#7850)
Reviewed-by: Richárd Antal <[email protected]>
Signed-off-by: Viraj Jasani <[email protected]>
(cherry picked from commit 26f9f0b1657291cad5215f9201624dbd2629ab67)
---
.../hadoop/hbase/client/AsyncClientScanner.java | 61 +++++++++-------
.../client/AsyncRpcRetryingCallerFactory.java | 9 ++-
.../AsyncScanSingleRegionRpcRetryingCaller.java | 23 ++++--
.../hadoop/hbase/client/ConnectionUtils.java | 7 ++
.../hbase/client/TestAsyncTableScanMetrics.java | 81 ++++++++++++----------
5 files changed, 112 insertions(+), 69 deletions(-)
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
index d58c8e60c8d..8d3bcb000bc 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
@@ -21,6 +21,7 @@ import static
org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
import static
org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType;
+import static
org.apache.hadoop.hbase.client.ConnectionUtils.incMillisBetweenNextsMetrics;
import static
org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
import static
org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
import static
org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics;
@@ -202,40 +203,42 @@ class AsyncClientScanner {
}
}
- private void startScan(OpenScannerResponse resp) {
- addListener(
+ // lastNextCallNanos is used to calculate the MILLIS_BETWEEN_NEXTS scan
metrics
+ private void startScan(OpenScannerResponse resp, long lastNextCallNanos) {
+ AsyncScanSingleRegionRpcRetryingCaller scanSingleRegionCaller =
conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc)
.remote(resp.isRegionServerRemote)
.scannerLeaseTimeoutPeriod(resp.resp.getTtl(),
TimeUnit.MILLISECONDS).stub(resp.stub)
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs,
TimeUnit.NANOSECONDS)
+ .lastNextCallNanos(lastNextCallNanos)
.pauseForServerOverloaded(pauseNsForServerOverloaded,
TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
- .setRequestAttributes(requestAttributes).start(resp.controller,
resp.resp),
- (hasMore, error) -> {
- try (Scope ignored = span.makeCurrent()) {
- if (error != null) {
- try {
- consumer.onError(error);
- return;
- } finally {
- TraceUtil.setError(span, error);
- span.end();
- }
+ .setRequestAttributes(requestAttributes).build();
+ addListener(scanSingleRegionCaller.start(resp.controller, resp.resp),
(hasMore, error) -> {
+ try (Scope ignored = span.makeCurrent()) {
+ if (error != null) {
+ try {
+ consumer.onError(error);
+ return;
+ } finally {
+ TraceUtil.setError(span, error);
+ span.end();
}
- if (hasMore) {
- openScanner();
- } else {
- try {
- consumer.onComplete();
- } finally {
- span.setStatus(StatusCode.OK);
- span.end();
- }
+ }
+ if (hasMore) {
+ openScanner(scanSingleRegionCaller);
+ } else {
+ try {
+ consumer.onComplete();
+ } finally {
+ span.setStatus(StatusCode.OK);
+ span.end();
}
}
- });
+ }
+ });
}
private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
@@ -256,11 +259,17 @@ class AsyncClientScanner {
: conn.connConf.getPrimaryScanTimeoutNs();
}
- private void openScanner() {
+ private void openScanner(AsyncScanSingleRegionRpcRetryingCaller
previousScanSingleRegionCaller) {
if (this.isScanMetricsByRegionEnabled) {
scanMetrics.moveToNextRegion();
}
incRegionCountMetrics(scanMetrics);
+ long openScannerStartNanos = System.nanoTime();
+ if (previousScanSingleRegionCaller != null) {
+ // open scanner is also a next call
+ incMillisBetweenNextsMetrics(scanMetrics, TimeUnit.NANOSECONDS
+ .toMillis(openScannerStartNanos -
previousScanSingleRegionCaller.getLastNextCallNanos()));
+ }
openScannerTries.set(1);
addListener(timelineConsistentRead(conn.getLocator(), tableName, scan,
scan.getStartRow(),
getLocateType(scan), this::openScanner, rpcTimeoutNs,
getPrimaryTimeoutNs(), retryTimer,
@@ -280,14 +289,14 @@ class AsyncClientScanner {
this.scanMetrics.initScanMetricsRegionInfo(loc.getRegion().getEncodedName(),
loc.getServerName());
}
- startScan(resp);
+ startScan(resp, openScannerStartNanos);
}
});
}
public void start() {
try (Scope ignored = span.makeCurrent()) {
- openScanner();
+ openScanner(null);
}
}
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index 1ea2a1ad7dd..50d9d36a2b5 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -209,6 +209,8 @@ class AsyncRpcRetryingCallerFactory {
private long rpcTimeoutNs;
+ private long lastNextCallNanos = System.nanoTime();
+
private int priority = PRIORITY_UNSET;
private Map<String, byte[]> requestAttributes = Collections.emptyMap();
@@ -275,6 +277,11 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
+ public ScanSingleRegionCallerBuilder lastNextCallNanos(long nanos) {
+ this.lastNextCallNanos = nanos;
+ return this;
+ }
+
public ScanSingleRegionCallerBuilder pauseForServerOverloaded(long pause,
TimeUnit unit) {
this.pauseNsForServerOverloaded = unit.toNanos(pause);
return this;
@@ -311,7 +318,7 @@ class AsyncRpcRetryingCallerFactory {
return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn,
scan, scanMetrics,
scannerId, resultCache, consumer, stub, loc, isRegionServerRemote,
priority,
scannerLeaseTimeoutPeriodNs, pauseNs, pauseNsForServerOverloaded,
maxAttempts,
- scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, requestAttributes);
+ scanTimeoutNs, rpcTimeoutNs, lastNextCallNanos, startLogErrorsCnt,
requestAttributes);
}
/**
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index 428a3c65507..55e09a53ffe 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
+import static
org.apache.hadoop.hbase.client.ConnectionUtils.incMillisBetweenNextsMetrics;
import static
org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
import static
org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
import static
org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
@@ -120,7 +121,9 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private boolean includeNextStartRowWhenError;
- private long nextCallStartNs;
+ private long lastNextCallNanos;
+
+ private long nextCallStartNanos;
private int tries;
@@ -334,7 +337,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc,
boolean isRegionServerRemote, int priority, long
scannerLeaseTimeoutPeriodNs, long pauseNs,
long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long
rpcTimeoutNs,
- int startLogErrorsCnt, Map<String, byte[]> requestAttributes) {
+ long lastNextCallNanos, int startLogErrorsCnt, Map<String, byte[]>
requestAttributes) {
this.retryTimer = retryTimer;
this.conn = conn;
this.scan = scan;
@@ -349,6 +352,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
this.maxAttempts = maxAttempts;
this.scanTimeoutNs = scanTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
+ this.lastNextCallNanos = lastNextCallNanos;
this.startLogErrorsCnt = startLogErrorsCnt;
if (scan.isReversed()) {
completeWhenNoMoreResultsInRegion =
this::completeReversedWhenNoMoreResultsInRegion;
@@ -365,8 +369,12 @@ class AsyncScanSingleRegionRpcRetryingCaller {
new HBaseServerExceptionPauseManager(pauseNs,
pauseNsForServerOverloaded, scanTimeoutNs);
}
+ public long getLastNextCallNanos() {
+ return lastNextCallNanos;
+ }
+
private long elapsedMs() {
- return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs);
+ return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
nextCallStartNanos);
}
private void closeScanner() {
@@ -433,7 +441,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
}
OptionalLong maybePauseNsToUse =
- pauseManager.getPauseNsFromException(error, tries, nextCallStartNs);
+ pauseManager.getPauseNsFromException(error, tries, nextCallStartNanos);
if (!maybePauseNsToUse.isPresent()) {
completeExceptionally(!scannerClosed);
return;
@@ -581,7 +589,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
// new one.
long callTimeoutNs;
if (scanTimeoutNs > 0) {
- long remainingNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
+ long remainingNs = scanTimeoutNs - (System.nanoTime() -
nextCallStartNanos);
if (remainingNs <= 0) {
completeExceptionally(true);
return;
@@ -609,7 +617,10 @@ class AsyncScanSingleRegionRpcRetryingCaller {
nextCallSeq++;
tries = 1;
exceptions.clear();
- nextCallStartNs = System.nanoTime();
+ nextCallStartNanos = System.nanoTime();
+ incMillisBetweenNextsMetrics(scanMetrics,
+ TimeUnit.NANOSECONDS.toMillis(nextCallStartNanos - lastNextCallNanos));
+ lastNextCallNanos = nextCallStartNanos;
call();
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 21d78fdd148..7a3dd9761be 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -419,6 +419,13 @@ public final class ConnectionUtils {
scanMetrics.addToCounter(ScanMetrics.REGIONS_SCANNED_METRIC_NAME, 1);
}
+ static void incMillisBetweenNextsMetrics(ScanMetrics scanMetrics, long
millis) {
+ if (scanMetrics == null) {
+ return;
+ }
+ scanMetrics.addToCounter(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME,
millis);
+ }
+
/**
* Connect the two futures, if the src future is done, then mark the dst
future as done. And if
* the dst future is done, then cancel the src future. This is used for
timeline consistent read.
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java
index f9209c246d1..0a5be1b7f50 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java
@@ -18,13 +18,17 @@
package org.apache.hadoop.hbase.client;
import static
org.apache.hadoop.hbase.client.metrics.ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME;
-import static
org.apache.hadoop.hbase.client.metrics.ScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
import static
org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
import static
org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_CALLS_METRIC_NAME;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
@@ -33,7 +37,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName;
@@ -43,26 +49,19 @@ import
org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.params.provider.Arguments;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
-@RunWith(Parameterized.class)
-@Category({ MediumTests.class, ClientTests.class })
+@Tag(MediumTests.TAG)
+@Tag(ClientTests.TAG)
+@HBaseParameterizedTestTemplate(name = "{index}: scan={0}")
public class TestAsyncTableScanMetrics {
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestAsyncTableScanMetrics.class);
-
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static final TableName TABLE_NAME = TableName.valueOf("ScanMetrics");
@@ -82,24 +81,24 @@ public class TestAsyncTableScanMetrics {
Pair<List<Result>, ScanMetrics> scan(Scan scan) throws Exception;
}
- @Parameter(0)
- public String methodName;
+ private ScanWithMetrics method;
- @Parameter(1)
- public ScanWithMetrics method;
+ // methodName is just for naming
+ public TestAsyncTableScanMetrics(String methodName, ScanWithMetrics method) {
+ this.method = method;
+ }
- @Parameters(name = "{index}: scan={0}")
- public static List<Object[]> params() {
+ public static Stream<Arguments> parameters() {
ScanWithMetrics doScanWithRawAsyncTable =
TestAsyncTableScanMetrics::doScanWithRawAsyncTable;
ScanWithMetrics doScanWithAsyncTableScan =
TestAsyncTableScanMetrics::doScanWithAsyncTableScan;
ScanWithMetrics doScanWithAsyncTableScanner =
TestAsyncTableScanMetrics::doScanWithAsyncTableScanner;
- return Arrays.asList(new Object[] { "doScanWithRawAsyncTable",
doScanWithRawAsyncTable },
- new Object[] { "doScanWithAsyncTableScan", doScanWithAsyncTableScan },
- new Object[] { "doScanWithAsyncTableScanner",
doScanWithAsyncTableScanner });
+ return Stream.of(Arguments.of("doScanWithRawAsyncTable",
doScanWithRawAsyncTable),
+ Arguments.of("doScanWithAsyncTableScan", doScanWithAsyncTableScan),
+ Arguments.of("doScanWithAsyncTableScanner",
doScanWithAsyncTableScanner));
}
- @BeforeClass
+ @BeforeAll
public static void setUp() throws Exception {
UTIL.startMiniCluster(3);
// Create 3 rows in the table, with rowkeys starting with "xxx*", "yyy*"
and "zzz*" so that
@@ -113,7 +112,7 @@ public class TestAsyncTableScanMetrics {
NUM_REGIONS = UTIL.getHBaseCluster().getRegions(TABLE_NAME).size();
}
- @AfterClass
+ @AfterAll
public static void tearDown() throws Exception {
Closeables.close(CONN, true);
UTIL.shutdownMiniCluster();
@@ -149,7 +148,7 @@ public class TestAsyncTableScanMetrics {
}
}
- @Test
+ @TestTemplate
public void testScanMetricsDisabled() throws Exception {
Pair<List<Result>, ScanMetrics> pair = method.scan(new Scan());
assertEquals(3, pair.getFirst().size());
@@ -157,11 +156,13 @@ public class TestAsyncTableScanMetrics {
assertNull(pair.getSecond());
}
- @Test
+ @TestTemplate
public void testScanMetricsWithScanMetricsByRegionDisabled() throws
Exception {
Scan scan = new Scan();
scan.setScanMetricsEnabled(true);
+ long startNanos = System.nanoTime();
Pair<List<Result>, ScanMetrics> pair = method.scan(scan);
+ long endNanos = System.nanoTime();
List<Result> results = pair.getFirst();
assertEquals(3, results.size());
long bytes = getBytesOfResults(results);
@@ -171,9 +172,11 @@ public class TestAsyncTableScanMetrics {
assertEquals(NUM_REGIONS, scanMetrics.countOfRPCcalls.get());
// Assert scan metrics have not been collected by region
assertTrue(scanMetrics.collectMetricsByRegion().isEmpty());
+ assertThat(scanMetrics.sumOfMillisSecBetweenNexts.get(),
+
both(greaterThan(0L)).and(lessThan(TimeUnit.NANOSECONDS.toMillis(endNanos -
startNanos))));
}
- @Test
+ @TestTemplate
public void testScanMetricsByRegionForSingleRegionScan() throws Exception {
Scan scan = new Scan();
scan.withStartRow(Bytes.toBytes("zzz1"), true);
@@ -201,13 +204,17 @@ public class TestAsyncTableScanMetrics {
// was scanned.
assertEquals(scanMetrics.getMetricsMap(false), metrics);
}
+ // we only have 1 rpc call so there is no millis 'between nexts'
+ assertEquals(0, scanMetrics.sumOfMillisSecBetweenNexts.get());
}
- @Test
+ @TestTemplate
public void testScanMetricsByRegionForMultiRegionScan() throws Exception {
Scan scan = new Scan();
scan.setEnableScanMetricsByRegion(true);
+ long startNanos = System.nanoTime();
Pair<List<Result>, ScanMetrics> pair = method.scan(scan);
+ long endNanos = System.nanoTime();
List<Result> results = pair.getFirst();
assertEquals(3, results.size());
long bytes = getBytesOfResults(results);
@@ -241,6 +248,8 @@ public class TestAsyncTableScanMetrics {
}
}
assertEquals(3, rowsScannedAcrossAllRegions);
+ assertThat(scanMetrics.sumOfMillisSecBetweenNexts.get(),
+
both(greaterThan(0L)).and(lessThan(TimeUnit.NANOSECONDS.toMillis(endNanos -
startNanos))));
}
static long getBytesOfResults(List<Result> results) {