This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new 8ac9040355e HBASE-29834 MILLIS_BETWEEN_NEXTS metric is not updated on 
branch-3+ (#7660) (#7850)
8ac9040355e is described below

commit 8ac9040355ede51bd4f5f029b66f8650230cc60f
Author: Duo Zhang <[email protected]>
AuthorDate: Thu Mar 5 22:33:14 2026 +0800

    HBASE-29834 MILLIS_BETWEEN_NEXTS metric is not updated on branch-3+ (#7660) 
(#7850)
    
    Reviewed-by: Richárd Antal <[email protected]>
    Signed-off-by: Viraj Jasani <[email protected]>
    
    (cherry picked from commit 26f9f0b1657291cad5215f9201624dbd2629ab67)
---
 .../hadoop/hbase/client/AsyncClientScanner.java    | 61 +++++++++-------
 .../client/AsyncRpcRetryingCallerFactory.java      |  9 ++-
 .../AsyncScanSingleRegionRpcRetryingCaller.java    | 23 ++++--
 .../hadoop/hbase/client/ConnectionUtils.java       |  7 ++
 .../hbase/client/TestAsyncTableScanMetrics.java    | 81 ++++++++++++----------
 5 files changed, 112 insertions(+), 69 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
index d58c8e60c8d..8d3bcb000bc 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
@@ -21,6 +21,7 @@ import static 
org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
 import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
 import static 
org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType;
+import static 
org.apache.hadoop.hbase.client.ConnectionUtils.incMillisBetweenNextsMetrics;
 import static 
org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
 import static 
org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
 import static 
org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics;
@@ -202,40 +203,42 @@ class AsyncClientScanner {
     }
   }
 
-  private void startScan(OpenScannerResponse resp) {
-    addListener(
+  // lastNextCallNanos is used to calculate the MILLIS_BETWEEN_NEXTS scan 
metrics
+  private void startScan(OpenScannerResponse resp, long lastNextCallNanos) {
+    AsyncScanSingleRegionRpcRetryingCaller scanSingleRegionCaller =
       
conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc)
         .remote(resp.isRegionServerRemote)
         .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), 
TimeUnit.MILLISECONDS).stub(resp.stub)
         
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
         .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
         .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, 
TimeUnit.NANOSECONDS)
+        .lastNextCallNanos(lastNextCallNanos)
         .pauseForServerOverloaded(pauseNsForServerOverloaded, 
TimeUnit.NANOSECONDS)
         .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
-        .setRequestAttributes(requestAttributes).start(resp.controller, 
resp.resp),
-      (hasMore, error) -> {
-        try (Scope ignored = span.makeCurrent()) {
-          if (error != null) {
-            try {
-              consumer.onError(error);
-              return;
-            } finally {
-              TraceUtil.setError(span, error);
-              span.end();
-            }
+        .setRequestAttributes(requestAttributes).build();
+    addListener(scanSingleRegionCaller.start(resp.controller, resp.resp), 
(hasMore, error) -> {
+      try (Scope ignored = span.makeCurrent()) {
+        if (error != null) {
+          try {
+            consumer.onError(error);
+            return;
+          } finally {
+            TraceUtil.setError(span, error);
+            span.end();
           }
-          if (hasMore) {
-            openScanner();
-          } else {
-            try {
-              consumer.onComplete();
-            } finally {
-              span.setStatus(StatusCode.OK);
-              span.end();
-            }
+        }
+        if (hasMore) {
+          openScanner(scanSingleRegionCaller);
+        } else {
+          try {
+            consumer.onComplete();
+          } finally {
+            span.setStatus(StatusCode.OK);
+            span.end();
           }
         }
-      });
+      }
+    });
   }
 
   private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
@@ -256,11 +259,17 @@ class AsyncClientScanner {
       : conn.connConf.getPrimaryScanTimeoutNs();
   }
 
-  private void openScanner() {
+  private void openScanner(AsyncScanSingleRegionRpcRetryingCaller 
previousScanSingleRegionCaller) {
     if (this.isScanMetricsByRegionEnabled) {
       scanMetrics.moveToNextRegion();
     }
     incRegionCountMetrics(scanMetrics);
+    long openScannerStartNanos = System.nanoTime();
+    if (previousScanSingleRegionCaller != null) {
+      // open scanner is also a next call
+      incMillisBetweenNextsMetrics(scanMetrics, TimeUnit.NANOSECONDS
+        .toMillis(openScannerStartNanos - 
previousScanSingleRegionCaller.getLastNextCallNanos()));
+    }
     openScannerTries.set(1);
     addListener(timelineConsistentRead(conn.getLocator(), tableName, scan, 
scan.getStartRow(),
       getLocateType(scan), this::openScanner, rpcTimeoutNs, 
getPrimaryTimeoutNs(), retryTimer,
@@ -280,14 +289,14 @@ class AsyncClientScanner {
             
this.scanMetrics.initScanMetricsRegionInfo(loc.getRegion().getEncodedName(),
               loc.getServerName());
           }
-          startScan(resp);
+          startScan(resp, openScannerStartNanos);
         }
       });
   }
 
   public void start() {
     try (Scope ignored = span.makeCurrent()) {
-      openScanner();
+      openScanner(null);
     }
   }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index 1ea2a1ad7dd..50d9d36a2b5 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -209,6 +209,8 @@ class AsyncRpcRetryingCallerFactory {
 
     private long rpcTimeoutNs;
 
+    private long lastNextCallNanos = System.nanoTime();
+
     private int priority = PRIORITY_UNSET;
 
     private Map<String, byte[]> requestAttributes = Collections.emptyMap();
@@ -275,6 +277,11 @@ class AsyncRpcRetryingCallerFactory {
       return this;
     }
 
+    public ScanSingleRegionCallerBuilder lastNextCallNanos(long nanos) {
+      this.lastNextCallNanos = nanos;
+      return this;
+    }
+
     public ScanSingleRegionCallerBuilder pauseForServerOverloaded(long pause, 
TimeUnit unit) {
       this.pauseNsForServerOverloaded = unit.toNanos(pause);
       return this;
@@ -311,7 +318,7 @@ class AsyncRpcRetryingCallerFactory {
       return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, 
scan, scanMetrics,
         scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, 
priority,
         scannerLeaseTimeoutPeriodNs, pauseNs, pauseNsForServerOverloaded, 
maxAttempts,
-        scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, requestAttributes);
+        scanTimeoutNs, rpcTimeoutNs, lastNextCallNanos, startLogErrorsCnt, 
requestAttributes);
     }
 
     /**
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index 428a3c65507..55e09a53ffe 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static 
org.apache.hadoop.hbase.client.ConnectionUtils.incMillisBetweenNextsMetrics;
 import static 
org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
 import static 
org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
 import static 
org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
@@ -120,7 +121,9 @@ class AsyncScanSingleRegionRpcRetryingCaller {
 
   private boolean includeNextStartRowWhenError;
 
-  private long nextCallStartNs;
+  private long lastNextCallNanos;
+
+  private long nextCallStartNanos;
 
   private int tries;
 
@@ -334,7 +337,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc,
     boolean isRegionServerRemote, int priority, long 
scannerLeaseTimeoutPeriodNs, long pauseNs,
     long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long 
rpcTimeoutNs,
-    int startLogErrorsCnt, Map<String, byte[]> requestAttributes) {
+    long lastNextCallNanos, int startLogErrorsCnt, Map<String, byte[]> 
requestAttributes) {
     this.retryTimer = retryTimer;
     this.conn = conn;
     this.scan = scan;
@@ -349,6 +352,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     this.maxAttempts = maxAttempts;
     this.scanTimeoutNs = scanTimeoutNs;
     this.rpcTimeoutNs = rpcTimeoutNs;
+    this.lastNextCallNanos = lastNextCallNanos;
     this.startLogErrorsCnt = startLogErrorsCnt;
     if (scan.isReversed()) {
       completeWhenNoMoreResultsInRegion = 
this::completeReversedWhenNoMoreResultsInRegion;
@@ -365,8 +369,12 @@ class AsyncScanSingleRegionRpcRetryingCaller {
       new HBaseServerExceptionPauseManager(pauseNs, 
pauseNsForServerOverloaded, scanTimeoutNs);
   }
 
+  public long getLastNextCallNanos() {
+    return lastNextCallNanos;
+  }
+
   private long elapsedMs() {
-    return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs);
+    return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
nextCallStartNanos);
   }
 
   private void closeScanner() {
@@ -433,7 +441,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     }
 
     OptionalLong maybePauseNsToUse =
-      pauseManager.getPauseNsFromException(error, tries, nextCallStartNs);
+      pauseManager.getPauseNsFromException(error, tries, nextCallStartNanos);
     if (!maybePauseNsToUse.isPresent()) {
       completeExceptionally(!scannerClosed);
       return;
@@ -581,7 +589,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     // new one.
     long callTimeoutNs;
     if (scanTimeoutNs > 0) {
-      long remainingNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
+      long remainingNs = scanTimeoutNs - (System.nanoTime() - 
nextCallStartNanos);
       if (remainingNs <= 0) {
         completeExceptionally(true);
         return;
@@ -609,7 +617,10 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     nextCallSeq++;
     tries = 1;
     exceptions.clear();
-    nextCallStartNs = System.nanoTime();
+    nextCallStartNanos = System.nanoTime();
+    incMillisBetweenNextsMetrics(scanMetrics,
+      TimeUnit.NANOSECONDS.toMillis(nextCallStartNanos - lastNextCallNanos));
+    lastNextCallNanos = nextCallStartNanos;
     call();
   }
 
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 21d78fdd148..7a3dd9761be 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -419,6 +419,13 @@ public final class ConnectionUtils {
     scanMetrics.addToCounter(ScanMetrics.REGIONS_SCANNED_METRIC_NAME, 1);
   }
 
+  static void incMillisBetweenNextsMetrics(ScanMetrics scanMetrics, long 
millis) {
+    if (scanMetrics == null) {
+      return;
+    }
+    scanMetrics.addToCounter(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME, 
millis);
+  }
+
   /**
    * Connect the two futures, if the src future is done, then mark the dst 
future as done. And if
    * the dst future is done, then cancel the src future. This is used for 
timeline consistent read.
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java
index f9209c246d1..0a5be1b7f50 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java
@@ -18,13 +18,17 @@
 package org.apache.hadoop.hbase.client;
 
 import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME;
-import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
 import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
 import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_CALLS_METRIC_NAME;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static 
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -33,7 +37,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ForkJoinPool;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.TableName;
@@ -43,26 +49,19 @@ import 
org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.params.provider.Arguments;
 
 import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
 
-@RunWith(Parameterized.class)
-@Category({ MediumTests.class, ClientTests.class })
+@Tag(MediumTests.TAG)
+@Tag(ClientTests.TAG)
+@HBaseParameterizedTestTemplate(name = "{index}: scan={0}")
 public class TestAsyncTableScanMetrics {
 
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestAsyncTableScanMetrics.class);
-
   private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
 
   private static final TableName TABLE_NAME = TableName.valueOf("ScanMetrics");
@@ -82,24 +81,24 @@ public class TestAsyncTableScanMetrics {
     Pair<List<Result>, ScanMetrics> scan(Scan scan) throws Exception;
   }
 
-  @Parameter(0)
-  public String methodName;
+  private ScanWithMetrics method;
 
-  @Parameter(1)
-  public ScanWithMetrics method;
+  // methodName is just for naming
+  public TestAsyncTableScanMetrics(String methodName, ScanWithMetrics method) {
+    this.method = method;
+  }
 
-  @Parameters(name = "{index}: scan={0}")
-  public static List<Object[]> params() {
+  public static Stream<Arguments> parameters() {
     ScanWithMetrics doScanWithRawAsyncTable = 
TestAsyncTableScanMetrics::doScanWithRawAsyncTable;
     ScanWithMetrics doScanWithAsyncTableScan = 
TestAsyncTableScanMetrics::doScanWithAsyncTableScan;
     ScanWithMetrics doScanWithAsyncTableScanner =
       TestAsyncTableScanMetrics::doScanWithAsyncTableScanner;
-    return Arrays.asList(new Object[] { "doScanWithRawAsyncTable", 
doScanWithRawAsyncTable },
-      new Object[] { "doScanWithAsyncTableScan", doScanWithAsyncTableScan },
-      new Object[] { "doScanWithAsyncTableScanner", 
doScanWithAsyncTableScanner });
+    return Stream.of(Arguments.of("doScanWithRawAsyncTable", 
doScanWithRawAsyncTable),
+      Arguments.of("doScanWithAsyncTableScan", doScanWithAsyncTableScan),
+      Arguments.of("doScanWithAsyncTableScanner", 
doScanWithAsyncTableScanner));
   }
 
-  @BeforeClass
+  @BeforeAll
   public static void setUp() throws Exception {
     UTIL.startMiniCluster(3);
     // Create 3 rows in the table, with rowkeys starting with "xxx*", "yyy*" 
and "zzz*" so that
@@ -113,7 +112,7 @@ public class TestAsyncTableScanMetrics {
     NUM_REGIONS = UTIL.getHBaseCluster().getRegions(TABLE_NAME).size();
   }
 
-  @AfterClass
+  @AfterAll
   public static void tearDown() throws Exception {
     Closeables.close(CONN, true);
     UTIL.shutdownMiniCluster();
@@ -149,7 +148,7 @@ public class TestAsyncTableScanMetrics {
     }
   }
 
-  @Test
+  @TestTemplate
   public void testScanMetricsDisabled() throws Exception {
     Pair<List<Result>, ScanMetrics> pair = method.scan(new Scan());
     assertEquals(3, pair.getFirst().size());
@@ -157,11 +156,13 @@ public class TestAsyncTableScanMetrics {
     assertNull(pair.getSecond());
   }
 
-  @Test
+  @TestTemplate
   public void testScanMetricsWithScanMetricsByRegionDisabled() throws 
Exception {
     Scan scan = new Scan();
     scan.setScanMetricsEnabled(true);
+    long startNanos = System.nanoTime();
     Pair<List<Result>, ScanMetrics> pair = method.scan(scan);
+    long endNanos = System.nanoTime();
     List<Result> results = pair.getFirst();
     assertEquals(3, results.size());
     long bytes = getBytesOfResults(results);
@@ -171,9 +172,11 @@ public class TestAsyncTableScanMetrics {
     assertEquals(NUM_REGIONS, scanMetrics.countOfRPCcalls.get());
     // Assert scan metrics have not been collected by region
     assertTrue(scanMetrics.collectMetricsByRegion().isEmpty());
+    assertThat(scanMetrics.sumOfMillisSecBetweenNexts.get(),
+      
both(greaterThan(0L)).and(lessThan(TimeUnit.NANOSECONDS.toMillis(endNanos - 
startNanos))));
   }
 
-  @Test
+  @TestTemplate
   public void testScanMetricsByRegionForSingleRegionScan() throws Exception {
     Scan scan = new Scan();
     scan.withStartRow(Bytes.toBytes("zzz1"), true);
@@ -201,13 +204,17 @@ public class TestAsyncTableScanMetrics {
       // was scanned.
       assertEquals(scanMetrics.getMetricsMap(false), metrics);
     }
+    // we only have 1 rpc call so there is no millis 'between nexts'
+    assertEquals(0, scanMetrics.sumOfMillisSecBetweenNexts.get());
   }
 
-  @Test
+  @TestTemplate
   public void testScanMetricsByRegionForMultiRegionScan() throws Exception {
     Scan scan = new Scan();
     scan.setEnableScanMetricsByRegion(true);
+    long startNanos = System.nanoTime();
     Pair<List<Result>, ScanMetrics> pair = method.scan(scan);
+    long endNanos = System.nanoTime();
     List<Result> results = pair.getFirst();
     assertEquals(3, results.size());
     long bytes = getBytesOfResults(results);
@@ -241,6 +248,8 @@ public class TestAsyncTableScanMetrics {
       }
     }
     assertEquals(3, rowsScannedAcrossAllRegions);
+    assertThat(scanMetrics.sumOfMillisSecBetweenNexts.get(),
+      
both(greaterThan(0L)).and(lessThan(TimeUnit.NANOSECONDS.toMillis(endNanos - 
startNanos))));
   }
 
   static long getBytesOfResults(List<Result> results) {

Reply via email to