This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new ca293f89dcd HBASE-29233: Capture scan metrics at region level (#6868)
ca293f89dcd is described below

commit ca293f89dcdadc60d082cce1a9bfc6f631515831
Author: sanjeet006py <[email protected]>
AuthorDate: Fri Jun 27 00:52:32 2025 +0530

    HBASE-29233: Capture scan metrics at region level (#6868)
    
    Signed-off-by: Viraj Jasani <[email protected]>
---
 .gitignore                                         |   1 +
 .../hadoop/hbase/client/AbstractClientScanner.java |  12 +
 .../hadoop/hbase/client/AsyncClientScanner.java    |  15 +
 .../hadoop/hbase/client/ConnectionUtils.java       |  14 +-
 .../apache/hadoop/hbase/client/ImmutableScan.java  |  11 +
 .../java/org/apache/hadoop/hbase/client/Scan.java  |  26 +-
 .../client/metrics/RegionScanMetricsData.java      |  77 +++
 .../hadoop/hbase/client/metrics/ScanMetrics.java   |  16 +-
 .../client/metrics/ScanMetricsRegionInfo.java      |  81 +++
 .../hbase/client/metrics/ScanMetricsUtil.java      |  88 +++
 .../client/metrics/ServerSideScanMetrics.java      | 121 +++-
 .../hbase/client/ClientSideRegionScanner.java      |  11 +-
 .../hadoop/hbase/client/TableSnapshotScanner.java  |   3 +-
 .../hadoop/hbase/regionserver/RSRpcServices.java   |   9 +-
 .../hbase/regionserver/RegionScannerImpl.java      |   7 +-
 .../hbase/client/TestAsyncTableScanMetrics.java    | 111 +++-
 ...AsyncTableScanMetricsWithScannerSuspending.java | 162 +++++
 .../hbase/client/TestClientSideRegionScanner.java  |  85 +++
 .../hadoop/hbase/client/TestReplicasClient.java    |  75 +++
 .../hadoop/hbase/client/TestScanAttributes.java    |  50 ++
 .../hadoop/hbase/client/TestTableScanMetrics.java  | 697 +++++++++++++++++++++
 .../hbase/client/TestTableSnapshotScanner.java     |  84 +++
 22 files changed, 1701 insertions(+), 55 deletions(-)

diff --git a/.gitignore b/.gitignore
index 52d169dd5ad..da26fb98456 100644
--- a/.gitignore
+++ b/.gitignore
@@ -25,3 +25,4 @@ linklint/
 **/*.log
 tmp
 **/.flattened-pom.xml
+.vscode/
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
index 48cec12f43c..ece657deb0a 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
@@ -26,6 +26,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 public abstract class AbstractClientScanner implements ResultScanner {
   protected ScanMetrics scanMetrics;
+  private boolean isScanMetricsByRegionEnabled = false;
 
   /**
    * Check and initialize if application wants to collect scan metrics
@@ -34,6 +35,9 @@ public abstract class AbstractClientScanner implements 
ResultScanner {
     // check if application wants to collect scan metrics
     if (scan.isScanMetricsEnabled()) {
       scanMetrics = new ScanMetrics();
+      if (scan.isScanMetricsByRegionEnabled()) {
+        isScanMetricsByRegionEnabled = true;
+      }
     }
   }
 
@@ -46,4 +50,12 @@ public abstract class AbstractClientScanner implements 
ResultScanner {
   public ScanMetrics getScanMetrics() {
     return scanMetrics;
   }
+
+  protected void setIsScanMetricsByRegionEnabled(boolean 
isScanMetricsByRegionEnabled) {
+    this.isScanMetricsByRegionEnabled = isScanMetricsByRegionEnabled;
+  }
+
+  protected boolean isScanMetricsByRegionEnabled() {
+    return isScanMetricsByRegionEnabled;
+  }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
index b61f5b80c9e..d58c8e60c8d 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
@@ -95,6 +95,8 @@ class AsyncClientScanner {
 
   private final Map<String, byte[]> requestAttributes;
 
+  private final boolean isScanMetricsByRegionEnabled;
+
   public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, 
TableName tableName,
     AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long 
pauseNsForServerOverloaded,
     int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int 
startLogErrorsCnt,
@@ -118,12 +120,17 @@ class AsyncClientScanner {
     this.startLogErrorsCnt = startLogErrorsCnt;
     this.resultCache = createScanResultCache(scan);
     this.requestAttributes = requestAttributes;
+    boolean isScanMetricsByRegionEnabled = false;
     if (scan.isScanMetricsEnabled()) {
       this.scanMetrics = new ScanMetrics();
       consumer.onScanMetricsCreated(scanMetrics);
+      if (this.scan.isScanMetricsByRegionEnabled()) {
+        isScanMetricsByRegionEnabled = true;
+      }
     } else {
       this.scanMetrics = null;
     }
+    this.isScanMetricsByRegionEnabled = isScanMetricsByRegionEnabled;
 
     /*
      * Assumes that the `start()` method is called immediately after 
construction. If this is no
@@ -250,6 +257,9 @@ class AsyncClientScanner {
   }
 
   private void openScanner() {
+    if (this.isScanMetricsByRegionEnabled) {
+      scanMetrics.moveToNextRegion();
+    }
     incRegionCountMetrics(scanMetrics);
     openScannerTries.set(1);
     addListener(timelineConsistentRead(conn.getLocator(), tableName, scan, 
scan.getStartRow(),
@@ -265,6 +275,11 @@ class AsyncClientScanner {
               span.end();
             }
           }
+          if (this.isScanMetricsByRegionEnabled) {
+            HRegionLocation loc = resp.loc;
+            
this.scanMetrics.initScanMetricsRegionInfo(loc.getRegion().getEncodedName(),
+              loc.getServerName());
+          }
           startScan(resp);
         }
       });
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index cdb85845f03..40e205ddca8 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -341,9 +341,9 @@ public final class ConnectionUtils {
     if (scanMetrics == null) {
       return;
     }
-    scanMetrics.countOfRPCcalls.incrementAndGet();
+    scanMetrics.addToCounter(ScanMetrics.RPC_CALLS_METRIC_NAME, 1);
     if (isRegionServerRemote) {
-      scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
+      scanMetrics.addToCounter(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME, 1);
     }
   }
 
@@ -351,9 +351,9 @@ public final class ConnectionUtils {
     if (scanMetrics == null) {
       return;
     }
-    scanMetrics.countOfRPCRetries.incrementAndGet();
+    scanMetrics.addToCounter(ScanMetrics.RPC_RETRIES_METRIC_NAME, 1);
     if (isRegionServerRemote) {
-      scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
+      scanMetrics.addToCounter(ScanMetrics.REMOTE_RPC_RETRIES_METRIC_NAME, 1);
     }
   }
 
@@ -368,9 +368,9 @@ public final class ConnectionUtils {
         resultSize += PrivateCellUtil.estimatedSerializedSizeOf(cell);
       }
     }
-    scanMetrics.countOfBytesInResults.addAndGet(resultSize);
+    scanMetrics.addToCounter(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME, 
resultSize);
     if (isRegionServerRemote) {
-      scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize);
+      
scanMetrics.addToCounter(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME, 
resultSize);
     }
   }
 
@@ -390,7 +390,7 @@ public final class ConnectionUtils {
     if (scanMetrics == null) {
       return;
     }
-    scanMetrics.countOfRegions.incrementAndGet();
+    scanMetrics.addToCounter(ScanMetrics.REGIONS_SCANNED_METRIC_NAME, 1);
   }
 
   /**
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ImmutableScan.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ImmutableScan.java
index 128d46daac4..eafab0e7fd8 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ImmutableScan.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ImmutableScan.java
@@ -228,6 +228,12 @@ public final class ImmutableScan extends Scan {
       "ImmutableScan does not allow access to setScanMetricsEnabled");
   }
 
+  @Override
+  public Scan setEnableScanMetricsByRegion(final boolean enable) {
+    throw new UnsupportedOperationException(
+      "ImmutableScan does not allow access to setEnableScanMetricsByRegion");
+  }
+
   @Override
   @Deprecated
   public Scan setAsyncPrefetch(boolean asyncPrefetch) {
@@ -402,6 +408,11 @@ public final class ImmutableScan extends Scan {
     return this.delegateScan.isScanMetricsEnabled();
   }
 
+  @Override
+  public boolean isScanMetricsByRegionEnabled() {
+    return this.delegateScan.isScanMetricsByRegionEnabled();
+  }
+
   @Override
   public Boolean isAsyncPrefetch() {
     return this.delegateScan.isAsyncPrefetch();
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
index 62a65e4e6e1..5cc0d0fac87 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
@@ -115,6 +115,8 @@ public class Scan extends Query {
   // define this attribute with the appropriate table name by calling
   // scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, 
Bytes.toBytes(tableName))
   static public final String SCAN_ATTRIBUTES_TABLE_NAME = 
"scan.attributes.table.name";
+  static private final String SCAN_ATTRIBUTES_METRICS_BY_REGION_ENABLE =
+    "scan.attributes.metrics.byregion.enable";
 
   /**
    * -1 means no caching specified and the value of {@link 
HConstants#HBASE_CLIENT_SCANNER_CACHING}
@@ -905,11 +907,15 @@ public class Scan extends Query {
   }
 
   /**
-   * Enable collection of {@link ScanMetrics}. For advanced users.
+   * Enable collection of {@link ScanMetrics}. For advanced users. While 
disabling scan metrics,
+   * will also disable region level scan metrics.
    * @param enabled Set to true to enable accumulating scan metrics
    */
   public Scan setScanMetricsEnabled(final boolean enabled) {
     setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, 
Bytes.toBytes(Boolean.valueOf(enabled)));
+    if (!enabled) {
+      setEnableScanMetricsByRegion(false);
+    }
     return this;
   }
 
@@ -1033,4 +1039,22 @@ public class Scan extends Query {
   public static Scan createScanFromCursor(Cursor cursor) {
     return new Scan().withStartRow(cursor.getRow());
   }
+
+  /**
+   * Enables region level scan metrics. If scan metrics are disabled then 
first enables scan metrics
+   * followed by region level scan metrics.
+   * @param enable Set to true to enable region level scan metrics.
+   */
+  public Scan setEnableScanMetricsByRegion(final boolean enable) {
+    if (enable) {
+      setScanMetricsEnabled(true);
+    }
+    setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_BY_REGION_ENABLE, 
Bytes.toBytes(enable));
+    return this;
+  }
+
+  public boolean isScanMetricsByRegionEnabled() {
+    byte[] attr = getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_BY_REGION_ENABLE);
+    return attr != null && Bytes.toBoolean(attr);
+  }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/RegionScanMetricsData.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/RegionScanMetricsData.java
new file mode 100644
index 00000000000..ca6a111175e
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/RegionScanMetricsData.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Captures region level scan metrics as a map of metric name ({@link String}) 
-> Value
+ * ({@link AtomicLong}). <br/>
+ * <br/>
+ * One instance stores scan metrics for a single region only.
+ */
[email protected]
+public class RegionScanMetricsData {
+  private final Map<String, AtomicLong> counters = new HashMap<>();
+  private ScanMetricsRegionInfo scanMetricsRegionInfo =
+    ScanMetricsRegionInfo.EMPTY_SCAN_METRICS_REGION_INFO;
+
+  AtomicLong createCounter(String counterName) {
+    return ScanMetricsUtil.createCounter(counters, counterName);
+  }
+
+  void setCounter(String counterName, long value) {
+    ScanMetricsUtil.setCounter(counters, counterName, value);
+  }
+
+  void addToCounter(String counterName, long delta) {
+    ScanMetricsUtil.addToCounter(counters, counterName, delta);
+  }
+
+  Map<String, Long> collectMetrics(boolean reset) {
+    return ScanMetricsUtil.collectMetrics(counters, reset);
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName() + "[" + scanMetricsRegionInfo + "," + 
"Counters=" + counters
+      + "]";
+  }
+
+  /**
+   * Populate encoded region name and server name details if not already 
populated. If details are
+   * already populated and a re-attempt is done then {@link 
UnsupportedOperationException} is
+   * thrown.
+   */
+  void initScanMetricsRegionInfo(String encodedRegionName, ServerName 
serverName) {
+    // Check by reference
+    if (scanMetricsRegionInfo == 
ScanMetricsRegionInfo.EMPTY_SCAN_METRICS_REGION_INFO) {
+      scanMetricsRegionInfo = new ScanMetricsRegionInfo(encodedRegionName, 
serverName);
+    } else {
+      throw new UnsupportedOperationException("ScanMetricsRegionInfo has 
already been initialized");
+    }
+  }
+
+  ScanMetricsRegionInfo getScanMetricsRegionInfo() {
+    return scanMetricsRegionInfo;
+  }
+}
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java
index e2038a17b37..8617a851509 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java
@@ -96,8 +96,22 @@ public class ScanMetrics extends ServerSideScanMetrics {
   public final AtomicLong countOfRemoteRPCRetries = 
createCounter(REMOTE_RPC_RETRIES_METRIC_NAME);
 
   /**
-   * constructor
+   * Constructor
    */
   public ScanMetrics() {
   }
+
+  @Override
+  public void moveToNextRegion() {
+    super.moveToNextRegion();
+    currentRegionScanMetricsData.createCounter(RPC_CALLS_METRIC_NAME);
+    currentRegionScanMetricsData.createCounter(REMOTE_RPC_CALLS_METRIC_NAME);
+    
currentRegionScanMetricsData.createCounter(MILLIS_BETWEEN_NEXTS_METRIC_NAME);
+    
currentRegionScanMetricsData.createCounter(NOT_SERVING_REGION_EXCEPTION_METRIC_NAME);
+    currentRegionScanMetricsData.createCounter(BYTES_IN_RESULTS_METRIC_NAME);
+    
currentRegionScanMetricsData.createCounter(BYTES_IN_REMOTE_RESULTS_METRIC_NAME);
+    currentRegionScanMetricsData.createCounter(REGIONS_SCANNED_METRIC_NAME);
+    currentRegionScanMetricsData.createCounter(RPC_RETRIES_METRIC_NAME);
+    currentRegionScanMetricsData.createCounter(REMOTE_RPC_RETRIES_METRIC_NAME);
+  }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetricsRegionInfo.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetricsRegionInfo.java
new file mode 100644
index 00000000000..72fb0ab418d
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetricsRegionInfo.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.metrics;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * POJO for capturing region level details when region level scan metrics are 
enabled. <br>
+ * <br>
+ * Currently, encoded region name and server name (host name, ports and 
startcode) are captured as
+ * region details. <br>
+ * <br>
+ * Instance of this class serves as key in the Map returned by
+ * {@link ServerSideScanMetrics#collectMetricsByRegion()} or
+ * {@link ServerSideScanMetrics#collectMetricsByRegion(boolean)}.
+ */
[email protected]
[email protected]
+public class ScanMetricsRegionInfo {
+  /**
+   * Users should only compare against this constant by reference and should 
not make any
+   * assumptions regarding content of the constant.
+   */
+  public static final ScanMetricsRegionInfo EMPTY_SCAN_METRICS_REGION_INFO =
+    new ScanMetricsRegionInfo(null, null);
+
+  private final String encodedRegionName;
+  private final ServerName serverName;
+
+  ScanMetricsRegionInfo(String encodedRegionName, ServerName serverName) {
+    this.encodedRegionName = encodedRegionName;
+    this.serverName = serverName;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof ScanMetricsRegionInfo other)) {
+      return false;
+    }
+    return new EqualsBuilder().append(encodedRegionName, 
other.encodedRegionName)
+      .append(serverName, other.serverName).isEquals();
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(17, 
37).append(encodedRegionName).append(serverName).toHashCode();
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName() + "[encodedRegionName=" + 
encodedRegionName + ",serverName="
+      + serverName + "]";
+  }
+
+  public String getEncodedRegionName() {
+    return encodedRegionName;
+  }
+
+  public ServerName getServerName() {
+    return serverName;
+  }
+}
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetricsUtil.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetricsUtil.java
new file mode 100644
index 00000000000..2e1dfb8a3c4
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetricsUtil.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]
+public final class ScanMetricsUtil {
+
+  private ScanMetricsUtil() {
+  }
+
+  /**
+   * Creates a new counter with the specified name and stores it in the 
counters map.
+   * @return {@link AtomicLong} instance for the counter with counterName
+   */
+  static AtomicLong createCounter(Map<String, AtomicLong> counters, String 
counterName) {
+    AtomicLong c = new AtomicLong(0);
+    counters.put(counterName, c);
+    return c;
+  }
+
+  /**
+   * Sets counter with counterName to passed in value, does nothing if counter 
does not exist.
+   */
+  static void setCounter(Map<String, AtomicLong> counters, String counterName, 
long value) {
+    AtomicLong c = counters.get(counterName);
+    if (c != null) {
+      c.set(value);
+    }
+  }
+
+  /**
+   * Increments the counter with counterName by delta, does nothing if counter 
does not exist.
+   */
+  static void addToCounter(Map<String, AtomicLong> counters, String 
counterName, long delta) {
+    AtomicLong c = counters.get(counterName);
+    if (c != null) {
+      c.addAndGet(delta);
+    }
+  }
+
+  /**
+   * Returns true if a counter exists with the counterName.
+   */
+  static boolean hasCounter(Map<String, AtomicLong> counters, String 
counterName) {
+    return counters.containsKey(counterName);
+  }
+
+  /**
+   * Returns {@link AtomicLong} instance for this counter name, null if 
counter does not exist.
+   */
+  static AtomicLong getCounter(Map<String, AtomicLong> counters, String 
counterName) {
+    return counters.get(counterName);
+  }
+
+  /**
+   * Get all the values. If reset is true, we will reset the all AtomicLongs 
back to 0.
+   * @param reset whether to reset the AtomicLongs to 0.
+   * @return A Map of String -> Long for metrics
+   */
+  static Map<String, Long> collectMetrics(Map<String, AtomicLong> counters, 
boolean reset) {
+    Map<String, Long> metricsSnapshot = new HashMap<>();
+    for (Map.Entry<String, AtomicLong> e : counters.entrySet()) {
+      long value = reset ? e.getValue().getAndSet(0) : e.getValue().get();
+      metricsSnapshot.put(e.getKey(), value);
+    }
+    return metricsSnapshot;
+  }
+}
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
index ff83584ccb4..ecc2d9bee14 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
@@ -17,9 +17,13 @@
  */
 package org.apache.hadoop.hbase.client.metrics;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
@@ -31,18 +35,31 @@ import 
org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
 @SuppressWarnings("checkstyle:VisibilityModifier") // See HBASE-27757
 public class ServerSideScanMetrics {
   /**
-   * Hash to hold the String -&gt; Atomic Long mappings for each metric
+   * Hash to hold the String -> Atomic Long mappings for each metric
    */
   private final Map<String, AtomicLong> counters = new HashMap<>();
+  private final List<RegionScanMetricsData> regionScanMetricsData = new 
ArrayList<>(0);
+  protected RegionScanMetricsData currentRegionScanMetricsData = null;
 
   /**
-   * Create a new counter with the specified name
+   * If region level scan metrics are enabled, must call this method to start 
collecting metrics for
+   * the region before scanning the region.
+   */
+  public void moveToNextRegion() {
+    currentRegionScanMetricsData = new RegionScanMetricsData();
+    regionScanMetricsData.add(currentRegionScanMetricsData);
+    
currentRegionScanMetricsData.createCounter(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
+    
currentRegionScanMetricsData.createCounter(COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME);
+    
currentRegionScanMetricsData.createCounter(BLOCK_BYTES_SCANNED_KEY_METRIC_NAME);
+    currentRegionScanMetricsData.createCounter(FS_READ_TIME_METRIC_NAME);
+  }
+
+  /**
+   * Create a new counter with the specified name.
    * @return {@link AtomicLong} instance for the counter with counterName
    */
   protected AtomicLong createCounter(String counterName) {
-    AtomicLong c = new AtomicLong(0);
-    counters.put(counterName, c);
-    return c;
+    return ScanMetricsUtil.createCounter(counters, counterName);
   }
 
   public static final String COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME = 
"ROWS_SCANNED";
@@ -69,52 +86,106 @@ public class ServerSideScanMetrics {
 
   public final AtomicLong fsReadTime = createCounter(FS_READ_TIME_METRIC_NAME);
 
+  /**
+   * Sets counter with counterName to passed in value, does nothing if counter 
does not exist. If
+   * region level scan metrics are enabled then sets the value of counter for 
the current region
+   * being scanned.
+   */
   public void setCounter(String counterName, long value) {
-    AtomicLong c = this.counters.get(counterName);
-    if (c != null) {
-      c.set(value);
+    ScanMetricsUtil.setCounter(counters, counterName, value);
+    if (this.currentRegionScanMetricsData != null) {
+      this.currentRegionScanMetricsData.setCounter(counterName, value);
     }
   }
 
-  /** Returns true if a counter exists with the counterName */
+  /**
+   * Returns true if a counter exists with the counterName.
+   */
   public boolean hasCounter(String counterName) {
-    return this.counters.containsKey(counterName);
+    return ScanMetricsUtil.hasCounter(counters, counterName);
   }
 
-  /** Returns {@link AtomicLong} instance for this counter name, null if 
counter does not exist. */
+  /**
+   * Returns {@link AtomicLong} instance for this counter name, null if 
counter does not exist.
+   */
   public AtomicLong getCounter(String counterName) {
-    return this.counters.get(counterName);
+    return ScanMetricsUtil.getCounter(counters, counterName);
   }
 
+  /**
+   * Increments the counter with counterName by delta, does nothing if counter 
does not exist. If
+   * region level scan metrics are enabled then increments the counter 
corresponding to the current
+   * region being scanned. Please see {@link #moveToNextRegion()}.
+   */
   public void addToCounter(String counterName, long delta) {
-    AtomicLong c = this.counters.get(counterName);
-    if (c != null) {
-      c.addAndGet(delta);
+    ScanMetricsUtil.addToCounter(counters, counterName, delta);
+    if (this.currentRegionScanMetricsData != null) {
+      this.currentRegionScanMetricsData.addToCounter(counterName, delta);
     }
   }
 
   /**
-   * Get all of the values since the last time this function was called. 
Calling this function will
-   * reset all AtomicLongs in the instance back to 0.
-   * @return A Map of String -&gt; Long for metrics
+   * Get all the values combined for all the regions since the last time this 
function was called.
+   * Calling this function will reset all AtomicLongs in the instance back to 
0.
+   * @return A Map of String -> Long for metrics
    */
   public Map<String, Long> getMetricsMap() {
     return getMetricsMap(true);
   }
 
   /**
-   * Get all of the values. If reset is true, we will reset the all 
AtomicLongs back to 0.
+   * Get all the values combined for all the regions. If reset is true, we 
will reset all the
+   * AtomicLongs back to 0.
    * @param reset whether to reset the AtomicLongs to 0.
-   * @return A Map of String -&gt; Long for metrics
+   * @return A Map of String -> Long for metrics
    */
   public Map<String, Long> getMetricsMap(boolean reset) {
+    return ImmutableMap.copyOf(ScanMetricsUtil.collectMetrics(counters, 
reset));
+  }
+
+  /**
+   * Get values grouped by each region scanned since the last time this was 
called. Calling this
+   * function will reset all region level scan metrics counters back to 0.
+   * @return A Map of region -> (Map of metric name -> Long) for metrics
+   */
+  public Map<ScanMetricsRegionInfo, Map<String, Long>> 
collectMetricsByRegion() {
+    return collectMetricsByRegion(true);
+  }
+
+  /**
+   * Get values grouped by each region scanned. If reset is true, will reset 
all the region level
+   * scan metrics counters back to 0.
+   * @param reset whether to reset region level scan metric counters to 0.
+   * @return A Map of region -> (Map of metric name -> Long) for metrics
+   */
+  public Map<ScanMetricsRegionInfo, Map<String, Long>> 
collectMetricsByRegion(boolean reset) {
     // Create a builder
-    ImmutableMap.Builder<String, Long> builder = ImmutableMap.builder();
-    for (Map.Entry<String, AtomicLong> e : this.counters.entrySet()) {
-      long value = reset ? e.getValue().getAndSet(0) : e.getValue().get();
-      builder.put(e.getKey(), value);
+    ImmutableMap.Builder<ScanMetricsRegionInfo, Map<String, Long>> builder = 
ImmutableMap.builder();
+    for (RegionScanMetricsData regionScanMetricsData : 
this.regionScanMetricsData) {
+      if (
+        regionScanMetricsData.getScanMetricsRegionInfo()
+            == ScanMetricsRegionInfo.EMPTY_SCAN_METRICS_REGION_INFO
+      ) {
+        continue;
+      }
+      builder.put(regionScanMetricsData.getScanMetricsRegionInfo(),
+        regionScanMetricsData.collectMetrics(reset));
     }
-    // Build the immutable map so that people can't mess around with it.
     return builder.build();
   }
+
+  @Override
+  public String toString() {
+    return counters + "," + 
regionScanMetricsData.stream().map(RegionScanMetricsData::toString)
+      .collect(Collectors.joining(","));
+  }
+
+  /**
+   * Call this method after calling {@link #moveToNextRegion()} to populate 
server name and encoded
+   * region name details for the region being scanned and for which metrics 
are being collected at
+   * the moment.
+   */
+  public void initScanMetricsRegionInfo(String encodedRegionName, ServerName 
serverName) {
+    currentRegionScanMetricsData.initScanMetricsRegionInfo(encodedRegionName, 
serverName);
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
index 144c01de874..df99fd40338 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
 import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
 import org.apache.hadoop.hbase.mob.MobFileCache;
@@ -90,6 +91,12 @@ public class ClientSideRegionScanner extends 
AbstractClientScanner {
       initScanMetrics(scan);
     } else {
       this.scanMetrics = scanMetrics;
+      setIsScanMetricsByRegionEnabled(scan.isScanMetricsByRegionEnabled());
+    }
+    if (isScanMetricsByRegionEnabled()) {
+      this.scanMetrics.moveToNextRegion();
+      
this.scanMetrics.initScanMetricsRegionInfo(region.getRegionInfo().getEncodedName(),
 null);
+      // The server name will be null in scan metrics as this is a client side 
region scanner
     }
     region.startRegionOperation();
   }
@@ -110,8 +117,8 @@ public class ClientSideRegionScanner extends 
AbstractClientScanner {
       for (Cell cell : values) {
         resultSize += PrivateCellUtil.estimatedSerializedSizeOf(cell);
       }
-      this.scanMetrics.countOfBytesInResults.addAndGet(resultSize);
-      this.scanMetrics.countOfRowsScanned.incrementAndGet();
+      this.scanMetrics.addToCounter(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME, 
resultSize);
+      
this.scanMetrics.addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME,
 1);
     }
 
     return result;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java
index c2bc0f08d10..41bd0bd988b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
 import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
@@ -187,7 +188,7 @@ public class TableSnapshotScanner extends 
AbstractClientScanner {
         currentRegionScanner =
           new ClientSideRegionScanner(conf, fs, restoreDir, htd, hri, scan, 
scanMetrics);
         if (this.scanMetrics != null) {
-          this.scanMetrics.countOfRegions.incrementAndGet();
+          
this.scanMetrics.addToCounter(ScanMetrics.REGIONS_SCANNED_METRIC_NAME, 1);
         }
       }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index d97318337e0..fa0c2fd3ff1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.VersionInfoUtil;
+import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
@@ -3516,10 +3517,12 @@ public class RSRpcServices extends 
HBaseRpcServicesBase<HRegionServer>
         if (trackMetrics) {
           // rather than increment yet another counter in StoreScanner, just 
set the value here
           // from block size progress before writing into the response
-          scannerContext.getMetrics().countOfBlockBytesScanned
-            .set(scannerContext.getBlockSizeProgress());
+          scannerContext.getMetrics().setCounter(
+            ServerSideScanMetrics.BLOCK_BYTES_SCANNED_KEY_METRIC_NAME,
+            scannerContext.getBlockSizeProgress());
           if (rpcCall != null) {
-            
scannerContext.getMetrics().fsReadTime.set(rpcCall.getFsReadTime());
+            
scannerContext.getMetrics().setCounter(ServerSideScanMetrics.FS_READ_TIME_METRIC_NAME,
+              rpcCall.getFsReadTime());
           }
           Map<String, Long> metrics = 
scannerContext.getMetrics().getMetricsMap();
           ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
index d7e4bb52a75..fed3bd1f992 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.ClientInternalHelper;
 import org.apache.hadoop.hbase.client.IsolationLevel;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
 import org.apache.hadoop.hbase.filter.FilterWrapper;
 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
 import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
@@ -645,7 +646,8 @@ public class RegionScannerImpl implements RegionScanner, 
Shipper, RpcCallback {
       return;
     }
 
-    scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet();
+    scannerContext.getMetrics()
+      
.addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME, 1);
   }
 
   private void incrementCountOfRowsScannedMetric(ScannerContext 
scannerContext) {
@@ -653,7 +655,8 @@ public class RegionScannerImpl implements RegionScanner, 
Shipper, RpcCallback {
       return;
     }
 
-    scannerContext.getMetrics().countOfRowsScanned.incrementAndGet();
+    scannerContext.getMetrics()
+      
.addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, 1);
   }
 
   /** Returns true when the joined heap may have data for the current row */
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java
index 0f7258f3435..6678e94b7e0 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java
@@ -17,19 +17,28 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME;
+import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
+import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
+import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_CALLS_METRIC_NAME;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ForkJoinPool;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -93,12 +102,12 @@ public class TestAsyncTableScanMetrics {
   @BeforeClass
   public static void setUp() throws Exception {
     UTIL.startMiniCluster(3);
-    // Create 3 rows in the table, with rowkeys starting with "zzz*" so that
-    // scan are forced to hit all the regions.
+    // Create 3 rows in the table, with rowkeys starting with "xxx*", "yyy*" 
and "zzz*" so that
+    // scan hits all the region and not all rows lie in a single region
     try (Table table = UTIL.createMultiRegionTable(TABLE_NAME, CF)) {
-      table.put(Arrays.asList(new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, 
VALUE),
-        new Put(Bytes.toBytes("zzz2")).addColumn(CF, CQ, VALUE),
-        new Put(Bytes.toBytes("zzz3")).addColumn(CF, CQ, VALUE)));
+      table.put(Arrays.asList(new Put(Bytes.toBytes("xxx1")).addColumn(CF, CQ, 
VALUE),
+        new Put(Bytes.toBytes("yyy1")).addColumn(CF, CQ, VALUE),
+        new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE)));
     }
     CONN = 
ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
     NUM_REGIONS = UTIL.getHBaseCluster().getRegions(TABLE_NAME).size();
@@ -141,25 +150,101 @@ public class TestAsyncTableScanMetrics {
   }
 
   @Test
-  public void testNoScanMetrics() throws Exception {
+  public void testScanMetricsDisabled() throws Exception {
     Pair<List<Result>, ScanMetrics> pair = method.scan(new Scan());
     assertEquals(3, pair.getFirst().size());
+    // Assert no scan metrics
     assertNull(pair.getSecond());
   }
 
   @Test
-  public void testScanMetrics() throws Exception {
-    Pair<List<Result>, ScanMetrics> pair = method.scan(new 
Scan().setScanMetricsEnabled(true));
+  public void testScanMetricsWithScanMetricsByRegionDisabled() throws 
Exception {
+    Scan scan = new Scan();
+    scan.setScanMetricsEnabled(true);
+    Pair<List<Result>, ScanMetrics> pair = method.scan(scan);
     List<Result> results = pair.getFirst();
     assertEquals(3, results.size());
-    long bytes = results.stream().flatMap(r -> 
Arrays.asList(r.rawCells()).stream())
-      .mapToLong(c -> PrivateCellUtil.estimatedSerializedSizeOf(c)).sum();
+    long bytes = getBytesOfResults(results);
+    ScanMetrics scanMetrics = pair.getSecond();
+    assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get());
+    assertEquals(bytes, scanMetrics.countOfBytesInResults.get());
+    assertEquals(NUM_REGIONS, scanMetrics.countOfRPCcalls.get());
+    // Assert scan metrics have not been collected by region
+    assertTrue(scanMetrics.collectMetricsByRegion().isEmpty());
+  }
+
+  @Test
+  public void testScanMetricsByRegionForSingleRegionScan() throws Exception {
+    Scan scan = new Scan();
+    scan.withStartRow(Bytes.toBytes("zzz1"), true);
+    scan.withStopRow(Bytes.toBytes("zzz1"), true);
+    scan.setEnableScanMetricsByRegion(true);
+    Pair<List<Result>, ScanMetrics> pair = method.scan(scan);
+    List<Result> results = pair.getFirst();
+    assertEquals(1, results.size());
+    long bytes = getBytesOfResults(results);
+    ScanMetrics scanMetrics = pair.getSecond();
+    assertEquals(1, scanMetrics.countOfRegions.get());
+    assertEquals(bytes, scanMetrics.countOfBytesInResults.get());
+    assertEquals(1, scanMetrics.countOfRPCcalls.get());
+    // Assert scan metrics by region were collected for the region scanned
+    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+      scanMetrics.collectMetricsByRegion(false);
+    assertEquals(1, scanMetricsByRegion.size());
+    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
scanMetricsByRegion
+      .entrySet()) {
+      ScanMetricsRegionInfo smri = entry.getKey();
+      Map<String, Long> metrics = entry.getValue();
+      assertNotNull(smri.getServerName());
+      assertNotNull(smri.getEncodedRegionName());
+      // Assert overall scan metrics and scan metrics by region should be 
equal as only 1 region
+      // was scanned.
+      assertEquals(scanMetrics.getMetricsMap(false), metrics);
+    }
+  }
+
+  @Test
+  public void testScanMetricsByRegionForMultiRegionScan() throws Exception {
+    Scan scan = new Scan();
+    scan.setEnableScanMetricsByRegion(true);
+    Pair<List<Result>, ScanMetrics> pair = method.scan(scan);
+    List<Result> results = pair.getFirst();
+    assertEquals(3, results.size());
+    long bytes = getBytesOfResults(results);
     ScanMetrics scanMetrics = pair.getSecond();
+    Map<String, Long> overallMetrics = scanMetrics.getMetricsMap(false);
+    assertEquals(NUM_REGIONS, (long) 
overallMetrics.get(REGIONS_SCANNED_METRIC_NAME));
     assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get());
+    assertEquals(bytes, (long) 
overallMetrics.get(BYTES_IN_RESULTS_METRIC_NAME));
     assertEquals(bytes, scanMetrics.countOfBytesInResults.get());
+    assertEquals(NUM_REGIONS, (long) 
overallMetrics.get(RPC_CALLS_METRIC_NAME));
     assertEquals(NUM_REGIONS, scanMetrics.countOfRPCcalls.get());
-    // also assert a server side metric to ensure that we have published them 
into the client side
-    // metrics.
-    assertEquals(3, scanMetrics.countOfRowsScanned.get());
+    // Assert scan metrics by region were collected for the region scanned
+    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+      scanMetrics.collectMetricsByRegion(false);
+    assertEquals(NUM_REGIONS, scanMetricsByRegion.size());
+    int rowsScannedAcrossAllRegions = 0;
+    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
scanMetricsByRegion
+      .entrySet()) {
+      ScanMetricsRegionInfo smri = entry.getKey();
+      Map<String, Long> perRegionMetrics = entry.getValue();
+      assertNotNull(smri.getServerName());
+      assertNotNull(smri.getEncodedRegionName());
+      assertEquals(1, (long) 
perRegionMetrics.get(REGIONS_SCANNED_METRIC_NAME));
+      if (perRegionMetrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) == 1) {
+        bytes = getBytesOfResults(Collections.singletonList(results.get(0)));
+        assertEquals(bytes, (long) 
perRegionMetrics.get(BYTES_IN_RESULTS_METRIC_NAME));
+        rowsScannedAcrossAllRegions++;
+      } else {
+        assertEquals(0, (long) 
perRegionMetrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+        assertEquals(0, (long) 
perRegionMetrics.get(BYTES_IN_RESULTS_METRIC_NAME));
+      }
+    }
+    assertEquals(3, rowsScannedAcrossAllRegions);
+  }
+
+  static long getBytesOfResults(List<Result> results) {
+    return results.stream().flatMap(r -> Arrays.asList(r.rawCells()).stream())
+      .mapToLong(c -> PrivateCellUtil.estimatedSerializedSizeOf(c)).sum();
   }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetricsWithScannerSuspending.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetricsWithScannerSuspending.java
new file mode 100644
index 00000000000..6a355b08365
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetricsWithScannerSuspending.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static 
org.apache.hadoop.hbase.client.TestAsyncTableScanMetrics.getBytesOfResults;
+import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME;
+import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
+import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_CALLS_METRIC_NAME;
+import static 
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncTableScanMetricsWithScannerSuspending {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    
HBaseClassTestRule.forClass(TestAsyncTableScanMetricsWithScannerSuspending.class);
+
+  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
+
+  private static final TableName TABLE_NAME =
+    
TableName.valueOf(TestAsyncTableScanMetricsWithScannerSuspending.class.getSimpleName());
+
+  private static final byte[] CF = Bytes.toBytes("cf");
+
+  private static final byte[] CQ = Bytes.toBytes("cq");
+
+  private static final byte[] VALUE = Bytes.toBytes("value");
+
+  private static AsyncConnection CONN;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    UTIL.startMiniCluster(1);
+    // Create 3 rows in the table, with rowkeys starting with "xxx*", "yyy*" 
and "zzz*" so that
+    // scan hits all the region and not all rows lie in a single region
+    try (Table table = UTIL.createMultiRegionTable(TABLE_NAME, CF)) {
+      table.put(Arrays.asList(new Put(Bytes.toBytes("xxx1")).addColumn(CF, CQ, 
VALUE),
+        new Put(Bytes.toBytes("yyy1")).addColumn(CF, CQ, VALUE),
+        new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE)));
+    }
+    CONN = 
ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    Closeables.close(CONN, true);
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testScanMetricsByRegionWithScannerSuspending() throws Exception {
+    // Setup scan
+    Scan scan = new Scan();
+    scan.withStartRow(Bytes.toBytes("xxx1"), true);
+    scan.withStopRow(Bytes.toBytes("zzz1"), true);
+    scan.setEnableScanMetricsByRegion(true);
+    scan.setMaxResultSize(1);
+
+    // Prepare scanner
+    final AtomicInteger rowsReadCounter = new AtomicInteger(0);
+    AsyncTableResultScanner scanner = new AsyncTableResultScanner(TABLE_NAME, 
scan, 1) {
+      @Override
+      public void onNext(Result[] results, ScanController controller) {
+        rowsReadCounter.addAndGet(results.length);
+        super.onNext(results, controller);
+      }
+    };
+
+    // Do the scan so that rows get loaded in the scanner (consumer)
+    CONN.getTable(TABLE_NAME).scan(scan, scanner);
+
+    List<Result> results = new ArrayList<>();
+    int expectedTotalRows = 3;
+    // Assert that only 1 row has been loaded so far as maxCacheSize is set to 
1 byte
+    for (int i = 1; i <= expectedTotalRows; i++) {
+      UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() {
+
+        @Override
+        public boolean evaluate() throws Exception {
+          return scanner.isSuspended();
+        }
+
+        @Override
+        public String explainFailure() throws Exception {
+          return "The given scanner has been suspended in time";
+        }
+      });
+      assertTrue(scanner.isSuspended());
+      assertEquals(i, rowsReadCounter.get());
+      results.add(scanner.next());
+    }
+    Assert.assertNull(scanner.next());
+
+    // Assert on overall scan metrics and scan metrics by region
+    ScanMetrics scanMetrics = scanner.getScanMetrics();
+    // Assert on overall scan metrics
+    long bytes = getBytesOfResults(results);
+    Map<String, Long> overallMetrics = scanMetrics.getMetricsMap(false);
+    assertEquals(3, (long) overallMetrics.get(REGIONS_SCANNED_METRIC_NAME));
+    assertEquals(bytes, (long) 
overallMetrics.get(BYTES_IN_RESULTS_METRIC_NAME));
+    // 1 Extra RPC call per region where no row is returned but 
moreResultsInRegion is set to false
+    assertEquals(6, (long) overallMetrics.get(RPC_CALLS_METRIC_NAME));
+    // Assert scan metrics by region were collected for the region scanned
+    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+      scanMetrics.collectMetricsByRegion(false);
+    assertEquals(3, scanMetricsByRegion.size());
+    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
scanMetricsByRegion
+      .entrySet()) {
+      ScanMetricsRegionInfo smri = entry.getKey();
+      Map<String, Long> perRegionMetrics = entry.getValue();
+      assertNotNull(smri.getServerName());
+      assertNotNull(smri.getEncodedRegionName());
+      assertEquals(1, (long) 
perRegionMetrics.get(REGIONS_SCANNED_METRIC_NAME));
+      assertEquals(1, (long) 
perRegionMetrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+      bytes = getBytesOfResults(Collections.singletonList(results.get(0)));
+      assertEquals(bytes, (long) 
perRegionMetrics.get(BYTES_IN_RESULTS_METRIC_NAME));
+      assertEquals(2, (long) perRegionMetrics.get(RPC_CALLS_METRIC_NAME));
+    }
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideRegionScanner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideRegionScanner.java
index 6da74bf031d..253e61f995c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideRegionScanner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideRegionScanner.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static 
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -29,6 +30,7 @@ import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -37,6 +39,8 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
 import org.apache.hadoop.hbase.filter.FilterBase;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
 import org.apache.hadoop.hbase.io.hfile.IndexOnlyLruBlockCache;
@@ -46,6 +50,7 @@ import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -179,6 +184,86 @@ public class TestClientSideRegionScanner {
     }
   }
 
+  @Test
+  public void testScanMetricsDisabled() throws IOException {
+    Configuration copyConf = new Configuration(conf);
+    Scan scan = new Scan();
+    try (ClientSideRegionScanner clientSideRegionScanner =
+      new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, 
null)) {
+      clientSideRegionScanner.next();
+      assertNull(clientSideRegionScanner.getScanMetrics());
+    }
+  }
+
+  private void testScanMetricsWithScanMetricsByRegionDisabled(ScanMetrics 
scanMetrics)
+    throws IOException {
+    Configuration copyConf = new Configuration(conf);
+    Scan scan = new Scan();
+    scan.setScanMetricsEnabled(true);
+    TEST_UTIL.getAdmin().flush(TableName.META_TABLE_NAME);
+    try (ClientSideRegionScanner clientSideRegionScanner =
+      new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, 
scanMetrics)) {
+      clientSideRegionScanner.next();
+      ScanMetrics scanMetricsFromScanner = 
clientSideRegionScanner.getScanMetrics();
+      assertNotNull(scanMetricsFromScanner);
+      if (scanMetrics != null) {
+        Assert.assertSame(scanMetrics, scanMetricsFromScanner);
+      }
+      Map<String, Long> metricsMap = 
scanMetricsFromScanner.getMetricsMap(false);
+      Assert.assertTrue(metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) 
> 0);
+      
Assert.assertTrue(scanMetricsFromScanner.collectMetricsByRegion(false).isEmpty());
+    }
+  }
+
+  @Test
+  public void testScanMetricsNotAsInputWithScanMetricsByRegionDisabled() 
throws IOException {
+    testScanMetricsWithScanMetricsByRegionDisabled(null);
+  }
+
+  @Test
+  public void testScanMetricsAsInputWithScanMetricsByRegionDisabled() throws 
IOException {
+    testScanMetricsWithScanMetricsByRegionDisabled(new ScanMetrics());
+  }
+
+  private void testScanMetricByRegion(ScanMetrics scanMetrics) throws 
IOException {
+    Configuration copyConf = new Configuration(conf);
+    Scan scan = new Scan();
+    scan.setEnableScanMetricsByRegion(true);
+    TEST_UTIL.getAdmin().flush(TableName.META_TABLE_NAME);
+    try (ClientSideRegionScanner clientSideRegionScanner =
+      new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, 
scanMetrics)) {
+      clientSideRegionScanner.next();
+      ScanMetrics scanMetricsFromScanner = 
clientSideRegionScanner.getScanMetrics();
+      assertNotNull(scanMetricsFromScanner);
+      if (scanMetrics != null) {
+        Assert.assertSame(scanMetrics, scanMetricsFromScanner);
+      }
+      Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+        scanMetricsFromScanner.collectMetricsByRegion();
+      Assert.assertEquals(1, scanMetricsByRegion.size());
+      for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
scanMetricsByRegion
+        .entrySet()) {
+        ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+        Map<String, Long> metricsMap = entry.getValue();
+        Assert.assertEquals(hri.getEncodedName(), 
scanMetricsRegionInfo.getEncodedRegionName());
+        Assert.assertNull(scanMetricsRegionInfo.getServerName());
+        
Assert.assertTrue(metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) > 0);
+        Assert.assertEquals((long) 
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME),
+          scanMetricsFromScanner.countOfRowsScanned.get());
+      }
+    }
+  }
+
+  @Test
+  public void testScanMetricsByRegionWithoutScanMetricsAsInput() throws 
IOException {
+    testScanMetricByRegion(null);
+  }
+
+  @Test
+  public void testScanMetricsByRegionWithScanMetricsAsInput() throws 
IOException {
+    testScanMetricByRegion(new ScanMetrics());
+  }
+
   private static Put createPut(int rowAsInt) {
     byte[] row = Bytes.toBytes(rowAsInt);
     Put put = new Put(row);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
index 02d80eb57f5..7f1dc69af2a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
@@ -17,9 +17,13 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
+import static 
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
+
 import com.codahale.metrics.Counter;
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -34,9 +38,11 @@ import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.StartTestingClusterOption;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -89,6 +95,7 @@ public class TestReplicasClient {
   private static final byte[] f = HConstants.CATALOG_FAMILY;
 
   private final static int REFRESH_PERIOD = 1000;
+  private static ServerName rsServerName;
 
   /**
    * This copro is used to synchronize the tests.
@@ -215,6 +222,9 @@ public class TestReplicasClient {
     Configuration c = new Configuration(HTU.getConfiguration());
     c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
     LOG.info("Master has stopped");
+
+    rsServerName = HTU.getHBaseCluster().getRegionServer(0).getServerName();
+    Assert.assertNotNull(rsServerName);
   }
 
   @AfterClass
@@ -615,4 +625,69 @@ public class TestReplicasClient {
       closeRegion(hriSecondary);
     }
   }
+
+  @Test
+  public void testScanMetricsByRegion() throws Exception {
+    byte[] b1 = Bytes.toBytes("testScanMetricsByRegion");
+    openRegion(hriSecondary);
+
+    try {
+      Put p = new Put(b1);
+      p.addColumn(f, b1, b1);
+      table.put(p);
+      LOG.info("Put done");
+      flushRegion(hriPrimary);
+
+      // Sleep for 2 * REFRESH_PERIOD so that flushed data is visible to 
secondary replica
+      Thread.sleep(2 * REFRESH_PERIOD);
+
+      // Explicitly read replica 0
+      Scan scan = new Scan();
+      scan.setEnableScanMetricsByRegion(true);
+      scan.withStartRow(b1, true);
+      scan.withStopRow(b1, true);
+      // Assert row was read from primary replica along with asserting scan 
metrics by region
+      assertScanMetrics(scan, hriPrimary, false);
+      LOG.info("Scanned primary replica");
+
+      // Read from region replica
+      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
+      scan = new Scan();
+      scan.setEnableScanMetricsByRegion(true);
+      scan.withStartRow(b1, true);
+      scan.withStopRow(b1, true);
+      scan.setConsistency(Consistency.TIMELINE);
+      // Assert row was read from secondary replica along with asserting scan 
metrics by region
+      assertScanMetrics(scan, hriSecondary, true);
+      LOG.info("Scanned secondary replica ");
+    } finally {
+      SlowMeCopro.getPrimaryCdl().get().countDown();
+      Delete d = new Delete(b1);
+      table.delete(d);
+      closeRegion(hriSecondary);
+    }
+  }
+
+  private void assertScanMetrics(Scan scan, RegionInfo regionInfo, boolean 
isStale)
+    throws IOException {
+    try (ResultScanner rs = table.getScanner(scan);) {
+      for (Result r : rs) {
+        Assert.assertEquals(isStale, r.isStale());
+        Assert.assertFalse(r.isEmpty());
+      }
+      Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+        rs.getScanMetrics().collectMetricsByRegion(false);
+      Assert.assertEquals(1, scanMetricsByRegion.size());
+      for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
scanMetricsByRegion
+        .entrySet()) {
+        ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+        Map<String, Long> metrics = entry.getValue();
+        Assert.assertEquals(rsServerName, 
scanMetricsRegionInfo.getServerName());
+        Assert.assertEquals(regionInfo.getEncodedName(),
+          scanMetricsRegionInfo.getEncodedRegionName());
+        Assert.assertEquals(1, (long) 
metrics.get(REGIONS_SCANNED_METRIC_NAME));
+        Assert.assertEquals(1, (long) 
metrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+      }
+    }
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanAttributes.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanAttributes.java
new file mode 100644
index 00000000000..36935511734
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanAttributes.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ClientTests.class, SmallTests.class })
+public class TestScanAttributes {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestScanAttributes.class);
+
+  @Test
+  public void testCoEnableAndCoDisableScanMetricsAndScanMetricsByRegion() {
+    Scan scan = new Scan();
+    Assert.assertFalse(scan.isScanMetricsEnabled());
+    Assert.assertFalse(scan.isScanMetricsByRegionEnabled());
+
+    // Assert enabling scan metrics by region enables scan metrics also
+    scan.setEnableScanMetricsByRegion(true);
+    Assert.assertTrue(scan.isScanMetricsEnabled());
+    Assert.assertTrue(scan.isScanMetricsByRegionEnabled());
+
+    // Assert disabling scan metrics disables scan metrics by region
+    scan.setScanMetricsEnabled(false);
+    Assert.assertFalse(scan.isScanMetricsEnabled());
+    Assert.assertFalse(scan.isScanMetricsByRegionEnabled());
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java
new file mode 100644
index 00000000000..bcbf625f1e1
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
+import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
+import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_RETRIES_METRIC_NAME;
+import static 
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@Category({ ClientTests.class, MediumTests.class })
+public class TestTableScanMetrics extends FromClientSideBase {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestTableScanMetrics.class);
+
+  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+
+  private static final TableName TABLE_NAME =
+    TableName.valueOf(TestTableScanMetrics.class.getSimpleName());
+
+  private static final byte[] CF = Bytes.toBytes("cf");
+
+  private static final byte[] CQ = Bytes.toBytes("cq");
+
+  private static final byte[] VALUE = Bytes.toBytes("value");
+
+  private static final Random RAND = new Random(11);
+
+  private static int NUM_REGIONS;
+
+  private static Connection CONN;
+
+  @Parameters(name = "{index}: scanner={0}")
+  public static List<Object[]> params() {
+    return Arrays.asList(new Object[] { "ForwardScanner", new Scan() },
+      new Object[] { "ReverseScanner", new Scan().setReversed(true) });
+  }
+
+  @Parameter(0)
+  public String scannerName;
+
+  @Parameter(1)
+  public Scan originalScan;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    // Start the minicluster
+    TEST_UTIL.startMiniCluster(2);
+    // Create 3 rows in the table, with rowkeys starting with "xxx*", "yyy*" 
and "zzz*" so that
+    // scan hits all the region and not all rows lie in a single region
+    try (Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, CF)) {
+      table.put(Arrays.asList(new Put(Bytes.toBytes("xxx1")).addColumn(CF, CQ, 
VALUE),
+        new Put(Bytes.toBytes("yyy1")).addColumn(CF, CQ, VALUE),
+        new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE)));
+    }
+    CONN = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
+    NUM_REGIONS = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).size();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  private Scan generateScan(byte[] smallerRow, byte[] largerRow) throws 
IOException {
+    Scan scan = new Scan(originalScan);
+    if (originalScan.isReversed()) {
+      scan.withStartRow(largerRow, true);
+      scan.withStopRow(smallerRow, true);
+    } else {
+      scan.withStartRow(smallerRow, true);
+      scan.withStopRow(largerRow, true);
+    }
+    return scan;
+  }
+
+  private ScanMetrics assertScannedRowsAndGetScanMetrics(Scan scan, int 
expectedCount)
+    throws IOException {
+    int countOfRows = 0;
+    ScanMetrics scanMetrics;
+    try (Table table = CONN.getTable(TABLE_NAME); ResultScanner scanner = 
table.getScanner(scan)) {
+      for (Result result : scanner) {
+        Assert.assertFalse(result.isEmpty());
+        countOfRows++;
+      }
+      scanMetrics = scanner.getScanMetrics();
+    }
+    Assert.assertEquals(expectedCount, countOfRows);
+    return scanMetrics;
+  }
+
+  @Test
+  public void testScanMetricsDisabled() throws Exception {
+    Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
+    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 3);
+    Assert.assertNull(scanMetrics);
+  }
+
+  @Test
+  public void testScanMetricsWithScanMetricByRegionDisabled() throws Exception 
{
+    Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
+    scan.setScanMetricsEnabled(true);
+    int expectedRowsScanned = 3;
+    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 
expectedRowsScanned);
+    Assert.assertNotNull(scanMetrics);
+    Map<String, Long> metricsMap = scanMetrics.getMetricsMap(false);
+    // The test setup is such that we have 1 row per region in the scan range
+    Assert.assertEquals(expectedRowsScanned, scanMetrics.countOfRegions.get());
+    Assert.assertEquals(expectedRowsScanned,
+      (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+    Assert.assertTrue(scanMetrics.collectMetricsByRegion().isEmpty());
+  }
+
+  @Test
+  public void testScanMetricsResetWithScanMetricsByRegionDisabled() throws 
Exception {
+    Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
+    scan.setScanMetricsEnabled(true);
+    int expectedRowsScanned = 3;
+    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 
expectedRowsScanned);
+    Assert.assertNotNull(scanMetrics);
+    // By default counters are collected with reset as true
+    Map<String, Long> metricsMap = scanMetrics.getMetricsMap();
+    Assert.assertEquals(expectedRowsScanned, (long) 
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+    Assert.assertEquals(expectedRowsScanned,
+      (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+    // Subsequent call to get scan metrics map should show all counters as 0
+    Assert.assertEquals(0, scanMetrics.countOfRegions.get());
+    Assert.assertEquals(0, scanMetrics.countOfRowsScanned.get());
+  }
+
+  @Test
+  public void testScanMetricsByRegionForSingleRegionScan() throws Exception {
+    Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("xxx1"));
+    scan.setEnableScanMetricsByRegion(true);
+    int expectedRowsScanned = 1;
+    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 
expectedRowsScanned);
+    Assert.assertNotNull(scanMetrics);
+    Map<String, Long> metricsMap = scanMetrics.getMetricsMap(false);
+    Assert.assertEquals(expectedRowsScanned, (long) 
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+    Assert.assertEquals(expectedRowsScanned,
+      (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+      scanMetrics.collectMetricsByRegion(false);
+    Assert.assertEquals(expectedRowsScanned, scanMetricsByRegion.size());
+    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
scanMetricsByRegion
+      .entrySet()) {
+      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+      metricsMap = entry.getValue();
+      Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
+      Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
+      // As we are scanning single row so, overall scan metrics will match per 
region scan metrics
+      Assert.assertEquals(expectedRowsScanned, (long) 
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+      Assert.assertEquals(expectedRowsScanned,
+        (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+    }
+  }
+
+  @Test
+  public void testScanMetricsByRegionForMultiRegionScan() throws Exception {
+    Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
+    scan.setEnableScanMetricsByRegion(true);
+    int expectedRowsScanned = 3;
+    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 
expectedRowsScanned);
+    Assert.assertNotNull(scanMetrics);
+    Assert.assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get());
+    Assert.assertEquals(expectedRowsScanned, 
scanMetrics.countOfRowsScanned.get());
+    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+      scanMetrics.collectMetricsByRegion(false);
+    Assert.assertEquals(NUM_REGIONS, scanMetricsByRegion.size());
+    int rowsScannedAcrossAllRegions = 0;
+    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
scanMetricsByRegion
+      .entrySet()) {
+      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+      Map<String, Long> metricsMap = entry.getValue();
+      Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
+      Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
+      Assert.assertEquals(1, (long) 
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+      if (metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) == 1) {
+        rowsScannedAcrossAllRegions++;
+      } else {
+        assertEquals(0, (long) 
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+      }
+    }
+    Assert.assertEquals(expectedRowsScanned, rowsScannedAcrossAllRegions);
+  }
+
+  @Test
+  public void testScanMetricsByRegionReset() throws Exception {
+    Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
+    scan.setEnableScanMetricsByRegion(true);
+    int expectedRowsScanned = 3;
+    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 
expectedRowsScanned);
+    Assert.assertNotNull(scanMetrics);
+
+    // Retrieve scan metrics by region as a map and reset
+    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+      scanMetrics.collectMetricsByRegion();
+    // We scan 1 row per region
+    Assert.assertEquals(expectedRowsScanned, scanMetricsByRegion.size());
+    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
scanMetricsByRegion
+      .entrySet()) {
+      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+      Map<String, Long> metricsMap = entry.getValue();
+      Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
+      Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
+      Assert.assertEquals(1, (long) 
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+      Assert.assertEquals(1, (long) 
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+    }
+
+    // Scan metrics have already been reset and now all counters should be 0
+    scanMetricsByRegion = scanMetrics.collectMetricsByRegion(false);
+    // Size of map should be same as earlier
+    Assert.assertEquals(expectedRowsScanned, scanMetricsByRegion.size());
+    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
scanMetricsByRegion
+      .entrySet()) {
+      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+      Map<String, Long> metricsMap = entry.getValue();
+      Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
+      Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
+      // Counters should have been reset to 0
+      Assert.assertEquals(0, (long) 
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+      Assert.assertEquals(0, (long) 
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+    }
+  }
+
+  @Test
+  public void testConcurrentUpdatesAndResetOfScanMetricsByRegion() throws 
Exception {
+    ThreadPoolExecutor executor = (ThreadPoolExecutor) 
Executors.newFixedThreadPool(2);
+    TableName tableName = 
TableName.valueOf(TestTableScanMetrics.class.getSimpleName()
+      + "_testConcurrentUpdatesAndResetToScanMetricsByRegion");
+    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
+      TEST_UTIL.loadTable(table, CF);
+
+      Map<ScanMetricsRegionInfo, Map<String, Long>> 
concurrentScanMetricsByRegion = new HashMap<>();
+
+      // Trigger two concurrent threads one of which scans the table and other 
periodically
+      // collects the scan metrics (along with resetting the counters to 0).
+      Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
+      scan.setEnableScanMetricsByRegion(true);
+      scan.setCaching(2);
+      try (ResultScanner rs = table.getScanner(scan)) {
+        ScanMetrics scanMetrics = rs.getScanMetrics();
+        AtomicInteger rowsScanned = new AtomicInteger(0);
+        CountDownLatch latch = new CountDownLatch(1);
+        Runnable tableScanner = new Runnable() {
+          public void run() {
+            for (Result r : rs) {
+              Assert.assertFalse(r.isEmpty());
+              rowsScanned.incrementAndGet();
+            }
+            latch.countDown();
+          }
+        };
+        Runnable metricsCollector =
+          getPeriodicScanMetricsCollector(scanMetrics, 
concurrentScanMetricsByRegion, latch);
+        executor.execute(tableScanner);
+        executor.execute(metricsCollector);
+        latch.await();
+        // Merge leftover scan metrics
+        mergeScanMetricsByRegion(scanMetrics.collectMetricsByRegion(),
+          concurrentScanMetricsByRegion);
+        Assert.assertEquals(HBaseTestingUtil.ROWS.length, rowsScanned.get());
+      }
+
+      Map<ScanMetricsRegionInfo, Map<String, Long>> 
expectedScanMetricsByRegion;
+
+      // Collect scan metrics by region from single thread. Assert that 
concurrent scan
+      // and metrics collection works as expected.
+      scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
+      scan.setEnableScanMetricsByRegion(true);
+      scan.setCaching(2);
+      try (ResultScanner rs = table.getScanner(scan)) {
+        ScanMetrics scanMetrics = rs.getScanMetrics();
+        int rowsScanned = 0;
+        for (Result r : rs) {
+          Assert.assertFalse(r.isEmpty());
+          rowsScanned++;
+        }
+        Assert.assertEquals(HBaseTestingUtil.ROWS.length, rowsScanned);
+        expectedScanMetricsByRegion = scanMetrics.collectMetricsByRegion();
+        for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
expectedScanMetricsByRegion
+          .entrySet()) {
+          ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+          Map<String, Long> metricsMap = entry.getValue();
+          Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
+          Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
+          Assert.assertEquals(1, (long) 
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+          // Each region will have 26 * 26 + 26 + 1 rows except last region 
which will have 1 row
+          long rowsScannedFromMetrics = 
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
+          Assert.assertTrue(
+            rowsScannedFromMetrics == 1 || rowsScannedFromMetrics == (26 * 26 
+ 26 + 1));
+        }
+      }
+
+      // Assert on scan metrics by region
+      Assert.assertEquals(expectedScanMetricsByRegion, 
concurrentScanMetricsByRegion);
+    } finally {
+      TEST_UTIL.deleteTable(tableName);
+    }
+  }
+
+  @Test
+  public void testScanMetricsByRegionWithRegionMove() throws Exception {
+    TableName tableName = TableName.valueOf(
+      TestTableScanMetrics.class.getSimpleName() + 
"testScanMetricsByRegionWithRegionMove");
+    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
+      TEST_UTIL.loadTable(table, CF);
+
+      // Scan 2 regions with start row keys: bbb and ccc
+      byte[] bbb = Bytes.toBytes("bbb");
+      byte[] ccc = Bytes.toBytes("ccc");
+      byte[] ddc = Bytes.toBytes("ddc");
+      long expectedCountOfRowsScannedInMovedRegion = 0;
+      // ROWS is the data loaded by loadTable()
+      for (byte[] row : HBaseTestingUtil.ROWS) {
+        if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ccc) < 0) {
+          expectedCountOfRowsScannedInMovedRegion++;
+        }
+      }
+      byte[] movedRegion = null;
+      ScanMetrics scanMetrics;
+
+      // Initialize scan with maxResultSize as size of 50 rows.
+      Scan scan = generateScan(bbb, ddc);
+      scan.setEnableScanMetricsByRegion(true);
+      scan.setMaxResultSize(8000);
+
+      try (ResultScanner rs = table.getScanner(scan)) {
+        boolean isFirstScanOfRegion = true;
+        for (Result r : rs) {
+          byte[] row = r.getRow();
+          if (isFirstScanOfRegion) {
+            movedRegion = moveRegion(tableName, row);
+            isFirstScanOfRegion = false;
+          }
+        }
+        Assert.assertNotNull(movedRegion);
+
+        scanMetrics = rs.getScanMetrics();
+        Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+          scanMetrics.collectMetricsByRegion();
+        long actualCountOfRowsScannedInMovedRegion = 0;
+        Set<ServerName> serversForMovedRegion = new HashSet<>();
+
+        // 2 regions scanned with two entries for first region as it moved in 
b/w scan
+        Assert.assertEquals(3, scanMetricsByRegion.size());
+        for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
scanMetricsByRegion
+          .entrySet()) {
+          ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+          Map<String, Long> metricsMap = entry.getValue();
+          if 
(scanMetricsRegionInfo.getEncodedRegionName().equals(Bytes.toString(movedRegion)))
 {
+            long rowsScanned = 
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
+            actualCountOfRowsScannedInMovedRegion += rowsScanned;
+            serversForMovedRegion.add(scanMetricsRegionInfo.getServerName());
+
+            Assert.assertEquals(1, (long) 
metricsMap.get(RPC_RETRIES_METRIC_NAME));
+          }
+          Assert.assertEquals(1, (long) 
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+        }
+        Assert.assertEquals(expectedCountOfRowsScannedInMovedRegion,
+          actualCountOfRowsScannedInMovedRegion);
+        Assert.assertEquals(2, serversForMovedRegion.size());
+      }
+    } finally {
+      TEST_UTIL.deleteTable(tableName);
+    }
+  }
+
+  @Test
+  public void testScanMetricsByRegionWithRegionSplit() throws Exception {
+    TableName tableName = TableName.valueOf(
+      TestTableScanMetrics.class.getSimpleName() + 
"testScanMetricsByRegionWithRegionSplit");
+    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
+      TEST_UTIL.loadTable(table, CF);
+
+      // Scan 1 region with start row key: bbb
+      byte[] bbb = Bytes.toBytes("bbb");
+      byte[] bmw = Bytes.toBytes("bmw");
+      byte[] ccb = Bytes.toBytes("ccb");
+      long expectedCountOfRowsScannedInRegion = 0;
+      // ROWS is the data loaded by loadTable()
+      for (byte[] row : HBaseTestingUtil.ROWS) {
+        if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ccb) <= 0) {
+          expectedCountOfRowsScannedInRegion++;
+        }
+      }
+      ScanMetrics scanMetrics;
+      Set<String> expectedSplitRegionRes = new HashSet<>();
+
+      // Initialize scan
+      Scan scan = generateScan(bbb, ccb);
+      scan.setEnableScanMetricsByRegion(true);
+      scan.setMaxResultSize(8000);
+
+      try (ResultScanner rs = table.getScanner(scan)) {
+        boolean isFirstScanOfRegion = true;
+        for (Result r : rs) {
+          if (isFirstScanOfRegion) {
+            splitRegion(tableName, bbb, bmw)
+              .forEach(region -> 
expectedSplitRegionRes.add(Bytes.toString(region)));
+            isFirstScanOfRegion = false;
+          }
+        }
+
+        scanMetrics = rs.getScanMetrics();
+        Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+          scanMetrics.collectMetricsByRegion();
+
+        long actualCountOfRowsScannedInRegion = 0;
+        long rpcRetiesCount = 0;
+        Set<String> splitRegionRes = new HashSet<>();
+
+        // 1 entry each for parent and two child regions
+        Assert.assertEquals(3, scanMetricsByRegion.size());
+        for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
scanMetricsByRegion
+          .entrySet()) {
+          ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+          Map<String, Long> metricsMap = entry.getValue();
+          long rowsScanned = 
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
+          actualCountOfRowsScannedInRegion += rowsScanned;
+          splitRegionRes.add(scanMetricsRegionInfo.getEncodedRegionName());
+
+          if (metricsMap.get(RPC_RETRIES_METRIC_NAME) == 1) {
+            rpcRetiesCount++;
+          }
+
+          Assert.assertEquals(1, (long) 
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+        }
+        Assert.assertEquals(expectedCountOfRowsScannedInRegion, 
actualCountOfRowsScannedInRegion);
+        Assert.assertEquals(2, rpcRetiesCount);
+        Assert.assertEquals(expectedSplitRegionRes, splitRegionRes);
+      }
+    } finally {
+      TEST_UTIL.deleteTable(tableName);
+    }
+  }
+
+  @Test
+  public void testScanMetricsByRegionWithRegionMerge() throws Exception {
+    TableName tableName = TableName.valueOf(
+      TestTableScanMetrics.class.getSimpleName() + 
"testScanMetricsByRegionWithRegionMerge");
+    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
+      TEST_UTIL.loadTable(table, CF);
+
+      // Scan 2 regions with start row keys: bbb and ccc
+      byte[] bbb = Bytes.toBytes("bbb");
+      byte[] ccc = Bytes.toBytes("ccc");
+      byte[] ddc = Bytes.toBytes("ddc");
+      long expectedCountOfRowsScannedInRegions = 0;
+      // ROWS is the data loaded by loadTable()
+      for (byte[] row : HBaseTestingUtil.ROWS) {
+        if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ddc) <= 0) {
+          expectedCountOfRowsScannedInRegions++;
+        }
+      }
+      ScanMetrics scanMetrics;
+      Set<String> expectedMergeRegionsRes = new HashSet<>();
+      String mergedRegionEncodedName = null;
+
+      // Initialize scan
+      Scan scan = generateScan(bbb, ddc);
+      scan.setEnableScanMetricsByRegion(true);
+      scan.setMaxResultSize(8000);
+
+      try (ResultScanner rs = table.getScanner(scan)) {
+        boolean isFirstScanOfRegion = true;
+        for (Result r : rs) {
+          if (isFirstScanOfRegion) {
+            List<byte[]> out = mergeRegions(tableName, bbb, ccc);
+            // Entry with index 2 is the encoded region name of merged region
+            mergedRegionEncodedName = Bytes.toString(out.get(2));
+            out.forEach(region -> 
expectedMergeRegionsRes.add(Bytes.toString(region)));
+            isFirstScanOfRegion = false;
+          }
+        }
+
+        scanMetrics = rs.getScanMetrics();
+        Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+          scanMetrics.collectMetricsByRegion();
+        long actualCountOfRowsScannedInRegions = 0;
+        Set<String> mergeRegionsRes = new HashSet<>();
+        boolean containsMergedRegionInScanMetrics = false;
+
+        // 1 entry each for old region from which first row was scanned and 
new merged region
+        Assert.assertEquals(2, scanMetricsByRegion.size());
+        for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
scanMetricsByRegion
+          .entrySet()) {
+          ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+          Map<String, Long> metricsMap = entry.getValue();
+          long rowsScanned = 
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
+          actualCountOfRowsScannedInRegions += rowsScanned;
+          mergeRegionsRes.add(scanMetricsRegionInfo.getEncodedRegionName());
+          if 
(scanMetricsRegionInfo.getEncodedRegionName().equals(mergedRegionEncodedName)) {
+            containsMergedRegionInScanMetrics = true;
+          }
+
+          Assert.assertEquals(1, (long) 
metricsMap.get(RPC_RETRIES_METRIC_NAME));
+          Assert.assertEquals(1, (long) 
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+        }
+        Assert.assertEquals(expectedCountOfRowsScannedInRegions, 
actualCountOfRowsScannedInRegions);
+        
Assert.assertTrue(expectedMergeRegionsRes.containsAll(mergeRegionsRes));
+        Assert.assertTrue(containsMergedRegionInScanMetrics);
+      }
+    } finally {
+      TEST_UTIL.deleteTable(tableName);
+    }
+  }
+
+  private Runnable getPeriodicScanMetricsCollector(ScanMetrics scanMetrics,
+    Map<ScanMetricsRegionInfo, Map<String, Long>> 
scanMetricsByRegionCollection,
+    CountDownLatch latch) {
+    return new Runnable() {
+      public void run() {
+        try {
+          while (latch.getCount() > 0) {
+            Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+              scanMetrics.collectMetricsByRegion();
+            mergeScanMetricsByRegion(scanMetricsByRegion, 
scanMetricsByRegionCollection);
+            Thread.sleep(RAND.nextInt(10));
+          }
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private void mergeScanMetricsByRegion(Map<ScanMetricsRegionInfo, Map<String, 
Long>> srcMap,
+    Map<ScanMetricsRegionInfo, Map<String, Long>> dstMap) {
+    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
srcMap.entrySet()) {
+      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+      Map<String, Long> metricsMap = entry.getValue();
+      if (dstMap.containsKey(scanMetricsRegionInfo)) {
+        Map<String, Long> dstMetricsMap = dstMap.get(scanMetricsRegionInfo);
+        for (Map.Entry<String, Long> metricEntry : metricsMap.entrySet()) {
+          String metricName = metricEntry.getKey();
+          Long existingValue = dstMetricsMap.get(metricName);
+          Long newValue = metricEntry.getValue();
+          dstMetricsMap.put(metricName, existingValue + newValue);
+        }
+      } else {
+        dstMap.put(scanMetricsRegionInfo, metricsMap);
+      }
+    }
+  }
+
+  /**
+   * Moves the region with start row key from its original region server to 
some other region
+   * server. This is a synchronous method.
+   * @param tableName Table name of region to be moved belongs.
+   * @param startRow  Start row key of the region to be moved.
+   * @return Encoded region name of the region which was moved.
+   */
+  private byte[] moveRegion(TableName tableName, byte[] startRow) throws 
IOException {
+    Admin admin = TEST_UTIL.getAdmin();
+    RegionLocator regionLocator = CONN.getRegionLocator(tableName);
+    HRegionLocation loc = regionLocator.getRegionLocation(startRow, true);
+    byte[] encodedRegionName = loc.getRegion().getEncodedNameAsBytes();
+    ServerName initialServerName = loc.getServerName();
+
+    admin.move(encodedRegionName);
+
+    ServerName finalServerName = regionLocator.getRegionLocation(startRow, 
true).getServerName();
+
+    // Assert that region actually moved
+    Assert.assertNotEquals(initialServerName, finalServerName);
+    return encodedRegionName;
+  }
+
+  /**
+   * Splits the region with start row key at the split key provided. This is a 
synchronous method.
+   * @param tableName Table name of region to be split.
+   * @param startRow  Start row key of the region to be split.
+   * @param splitKey  Split key for splitting the region.
+   * @return List of encoded region names with first element being parent 
region followed by two
+   *         child regions.
+   */
+  private List<byte[]> splitRegion(TableName tableName, byte[] startRow, 
byte[] splitKey)
+    throws IOException {
+    Admin admin = TEST_UTIL.getAdmin();
+    RegionLocator regionLocator = CONN.getRegionLocator(tableName);
+    HRegionLocation topLoc = regionLocator.getRegionLocation(startRow, true);
+    byte[] initialEncodedTopRegionName = 
topLoc.getRegion().getEncodedNameAsBytes();
+    ServerName initialTopServerName = topLoc.getServerName();
+    HRegionLocation bottomLoc = regionLocator.getRegionLocation(splitKey, 
true);
+    byte[] initialEncodedBottomRegionName = 
bottomLoc.getRegion().getEncodedNameAsBytes();
+    ServerName initialBottomServerName = bottomLoc.getServerName();
+
+    // Assert region is ready for split
+    Assert.assertEquals(initialTopServerName, initialBottomServerName);
+    Assert.assertEquals(initialEncodedTopRegionName, 
initialEncodedBottomRegionName);
+
+    FutureUtils.get(admin.splitRegionAsync(initialEncodedTopRegionName, 
splitKey));
+
+    topLoc = regionLocator.getRegionLocation(startRow, true);
+    byte[] finalEncodedTopRegionName = 
topLoc.getRegion().getEncodedNameAsBytes();
+    bottomLoc = regionLocator.getRegionLocation(splitKey, true);
+    byte[] finalEncodedBottomRegionName = 
bottomLoc.getRegion().getEncodedNameAsBytes();
+
+    // Assert that region split is complete
+    Assert.assertNotEquals(finalEncodedTopRegionName, 
finalEncodedBottomRegionName);
+    Assert.assertNotEquals(initialEncodedTopRegionName, 
finalEncodedBottomRegionName);
+    Assert.assertNotEquals(initialEncodedBottomRegionName, 
finalEncodedTopRegionName);
+
+    return Arrays.asList(initialEncodedTopRegionName, 
finalEncodedTopRegionName,
+      finalEncodedBottomRegionName);
+  }
+
+  /**
+   * Merges two regions with the start row key as topRegion and bottomRegion. 
Ensures that the
+   * regions to be merged are adjacent regions. This is a synchronous method.
+   * @param tableName    Table name of regions to be merged.
+   * @param topRegion    Start row key of first region for merging.
+   * @param bottomRegion Start row key of second region for merging.
+   * @return List of encoded region names with first two elements being 
original regions followed by
+   *         the merged region.
+   */
+  private List<byte[]> mergeRegions(TableName tableName, byte[] topRegion, 
byte[] bottomRegion)
+    throws IOException {
+    Admin admin = TEST_UTIL.getAdmin();
+    RegionLocator regionLocator = CONN.getRegionLocator(tableName);
+    HRegionLocation topLoc = regionLocator.getRegionLocation(topRegion, true);
+    byte[] initialEncodedTopRegionName = 
topLoc.getRegion().getEncodedNameAsBytes();
+    String initialTopRegionEndKey = 
Bytes.toString(topLoc.getRegion().getEndKey());
+    HRegionLocation bottomLoc = regionLocator.getRegionLocation(bottomRegion, 
true);
+    byte[] initialEncodedBottomRegionName = 
bottomLoc.getRegion().getEncodedNameAsBytes();
+    String initialBottomRegionStartKey = 
Bytes.toString(bottomLoc.getRegion().getStartKey());
+
+    // Assert that regions are ready to be merged
+    Assert.assertNotEquals(initialEncodedTopRegionName, 
initialEncodedBottomRegionName);
+    Assert.assertEquals(initialBottomRegionStartKey, initialTopRegionEndKey);
+
+    FutureUtils.get(admin.mergeRegionsAsync(
+      new byte[][] { initialEncodedTopRegionName, 
initialEncodedBottomRegionName }, false));
+
+    topLoc = regionLocator.getRegionLocation(topRegion, true);
+    byte[] finalEncodedTopRegionName = 
topLoc.getRegion().getEncodedNameAsBytes();
+    bottomLoc = regionLocator.getRegionLocation(bottomRegion, true);
+    byte[] finalEncodedBottomRegionName = 
bottomLoc.getRegion().getEncodedNameAsBytes();
+
+    // Assert regions have been merges successfully
+    Assert.assertEquals(finalEncodedTopRegionName, 
finalEncodedBottomRegionName);
+    Assert.assertNotEquals(initialEncodedTopRegionName, 
finalEncodedTopRegionName);
+    Assert.assertNotEquals(initialEncodedBottomRegionName, 
finalEncodedTopRegionName);
+
+    return Arrays.asList(initialEncodedTopRegionName, 
initialEncodedBottomRegionName,
+      finalEncodedTopRegionName);
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
index 4f921540a17..621ecc08778 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
@@ -17,9 +17,13 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
+import static 
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -31,6 +35,8 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.StartTestingClusterOption;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
 import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -244,6 +250,84 @@ public class TestTableSnapshotScanner {
     testScanner(UTIL, "testWithMultiRegion", 20, true);
   }
 
+  private ScanMetrics createTableSnapshotScannerAndGetScanMetrics(boolean 
enableScanMetrics,
+    boolean enableScanMetricsByRegion, byte[] endKey) throws Exception {
+    TableName tableName = TableName.valueOf(name.getMethodName() + "_TABLE");
+    String snapshotName = name.getMethodName() + "_SNAPSHOT";
+    try {
+      createTableAndSnapshot(UTIL, tableName, snapshotName, 50);
+      Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName);
+      Scan scan = new Scan().withStartRow(bbb).withStopRow(endKey);
+      scan.setScanMetricsEnabled(enableScanMetrics);
+      scan.setEnableScanMetricsByRegion(enableScanMetricsByRegion);
+      Configuration conf = UTIL.getConfiguration();
+
+      TableSnapshotScanner snapshotScanner =
+        new TableSnapshotScanner(conf, restoreDir, snapshotName, scan);
+      verifyScanner(snapshotScanner, bbb, endKey);
+      return snapshotScanner.getScanMetrics();
+    } finally {
+      UTIL.getAdmin().deleteSnapshot(snapshotName);
+      UTIL.deleteTable(tableName);
+    }
+  }
+
+  @Test
+  public void testScanMetricsDisabled() throws Exception {
+    ScanMetrics scanMetrics = 
createTableSnapshotScannerAndGetScanMetrics(false, false, yyy);
+    Assert.assertNull(scanMetrics);
+  }
+
+  @Test
+  public void testScanMetricsWithScanMetricsByRegionDisabled() throws 
Exception {
+    ScanMetrics scanMetrics = 
createTableSnapshotScannerAndGetScanMetrics(true, false, yyy);
+    Assert.assertNotNull(scanMetrics);
+    int rowsScanned = 0;
+    for (byte[] row : HBaseTestingUtil.ROWS) {
+      if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, yyy) < 0) {
+        rowsScanned++;
+      }
+    }
+    Map<String, Long> metricsMap = scanMetrics.getMetricsMap();
+    Assert.assertEquals(rowsScanned, (long) 
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+  }
+
+  @Test
+  public void testScanMetricsByRegionForSingleRegion() throws Exception {
+    // Scan single row with row key bbb
+    byte[] bbc = Bytes.toBytes("bbc");
+    ScanMetrics scanMetrics = 
createTableSnapshotScannerAndGetScanMetrics(true, true, bbc);
+    Assert.assertNotNull(scanMetrics);
+    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+      scanMetrics.collectMetricsByRegion();
+    Assert.assertEquals(1, scanMetricsByRegion.size());
+    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
scanMetricsByRegion
+      .entrySet()) {
+      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+      Map<String, Long> metricsMap = entry.getValue();
+      Assert.assertNull(scanMetricsRegionInfo.getServerName());
+      Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
+      Assert.assertEquals(1, (long) 
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+      Assert.assertEquals(1, (long) 
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+    }
+  }
+
+  @Test
+  public void testScanMetricsByRegionForMultiRegion() throws Exception {
+    ScanMetrics scanMetrics = 
createTableSnapshotScannerAndGetScanMetrics(true, true, yyy);
+    Assert.assertNotNull(scanMetrics);
+    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+      scanMetrics.collectMetricsByRegion();
+    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
scanMetricsByRegion
+      .entrySet()) {
+      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+      Map<String, Long> metricsMap = entry.getValue();
+      Assert.assertNull(scanMetricsRegionInfo.getServerName());
+      Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
+      Assert.assertEquals(1, (long) 
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+    }
+  }
+
   @Test
   public void testScannerWithRestoreScanner() throws Exception {
     TableName tableName = TableName.valueOf("testScanner");

Reply via email to