This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 89416ce17b7 HBASE-27355 Separate meta read requests from master and 
client (#7261)
89416ce17b7 is described below

commit 89416ce17b7f9a2d16a5ad37fc89d0c4da4b50d9
Author: Ruanhui <[email protected]>
AuthorDate: Mon Sep 15 09:47:30 2025 +0800

    HBASE-27355 Separate meta read requests from master and client (#7261)
    
    Co-authored-by: huiruan <[email protected]>
    
    Signed-off-by: Duo Zhang <[email protected]>
    Reviewed-by: Aman Poonia <[email protected]>
---
 .../org/apache/hadoop/hbase/MetaTableAccessor.java | 37 ++++++++-----
 .../hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java   | 25 ++++++++-
 .../hadoop/hbase/ipc/RWQueueRpcExecutor.java       |  4 ++
 .../org/apache/hadoop/hbase/ipc/RpcExecutor.java   |  6 ++-
 .../RSAnnotationReadingPriorityFunction.java       |  6 ++-
 .../hadoop/hbase/ipc/TestSimpleRpcScheduler.java   | 60 ++++++++++++----------
 6 files changed, 93 insertions(+), 45 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index 98750d38a7c..05b049e27db 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.RowFilter;
 import org.apache.hadoop.hbase.filter.SubstringComparator;
 import org.apache.hadoop.hbase.master.RegionState;
+import 
org.apache.hadoop.hbase.regionserver.RSAnnotationReadingPriorityFunction;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.ExceptionUtil;
@@ -185,6 +186,7 @@ public final class MetaTableAccessor {
     }
     Get get = new Get(row);
     get.addFamily(HConstants.CATALOG_FAMILY);
+    get.setPriority(RSAnnotationReadingPriorityFunction.INTERNAL_READ_QOS);
     Result r;
     try (Table t = getMetaHTable(connection)) {
       r = t.get(get);
@@ -213,6 +215,7 @@ public final class MetaTableAccessor {
     throws IOException {
     Get get = new Get(CatalogFamilyFormat.getMetaKeyForRegion(ri));
     get.addFamily(HConstants.CATALOG_FAMILY);
+    get.setPriority(RSAnnotationReadingPriorityFunction.INTERNAL_READ_QOS);
     try (Table t = getMetaHTable(connection)) {
       return t.get(get);
     }
@@ -226,11 +229,7 @@ public final class MetaTableAccessor {
    */
   public static Result getRegionResult(Connection connection, RegionInfo 
regionInfo)
     throws IOException {
-    Get get = new Get(CatalogFamilyFormat.getMetaKeyForRegion(regionInfo));
-    get.addFamily(HConstants.CATALOG_FAMILY);
-    try (Table t = getMetaHTable(connection)) {
-      return t.get(get);
-    }
+    return getCatalogFamilyRow(connection, regionInfo);
   }
 
   /**
@@ -341,6 +340,7 @@ public final class MetaTableAccessor {
       scan.setReadType(Scan.ReadType.PREAD);
     }
     scan.setCaching(scannerCaching);
+    scan.setPriority(RSAnnotationReadingPriorityFunction.INTERNAL_READ_QOS);
     return scan;
   }
 
@@ -368,7 +368,7 @@ public final class MetaTableAccessor {
     final boolean excludeOfflinedSplitParents) throws IOException {
     if (tableName != null && tableName.equals(TableName.META_TABLE_NAME)) {
       throw new IOException(
-        "This method can't be used to locate meta regions;" + " use 
MetaTableLocator instead");
+        "This method can't be used to locate meta regions; use 
MetaTableLocator instead");
     }
     // Make a version of CollectingVisitor that collects RegionInfo and 
ServerAddress
     ClientMetaTableAccessor.CollectRegionLocationsVisitor visitor =
@@ -385,10 +385,10 @@ public final class MetaTableAccessor {
       if (r == null || r.isEmpty()) {
         return true;
       }
-      LOG.info("fullScanMetaAndPrint.Current Meta Row: " + r);
+      LOG.info("fullScanMetaAndPrint.Current Meta Row: {}", r);
       TableState state = CatalogFamilyFormat.getTableState(r);
       if (state != null) {
-        LOG.info("fullScanMetaAndPrint.Table State={}" + state);
+        LOG.info("fullScanMetaAndPrint.Table State={}", state);
       } else {
         RegionLocations locations = CatalogFamilyFormat.getRegionLocations(r);
         if (locations == null) {
@@ -461,6 +461,15 @@ public final class MetaTableAccessor {
     scanMeta(connection, startRow, stopRow, type, null, maxRows, visitor);
   }
 
+  /**
+   * Performs a scan of META table.
+   * @param connection connection we're using
+   * @param startRow   Where to start the scan. Pass null if want to begin 
scan at first row.
+   * @param stopRow    Where to stop the scan. Pass null if want to scan all 
rows from the start one
+   * @param type       scanned part of meta
+   * @param maxRows    maximum rows to return
+   * @param visitor    Visitor invoked against each row.
+   */
   public static void scanMeta(Connection connection, @Nullable final byte[] 
startRow,
     @Nullable final byte[] stopRow, QueryType type, @Nullable Filter filter, 
int maxRows,
     final ClientMetaTableAccessor.Visitor visitor) throws IOException {
@@ -481,9 +490,11 @@ public final class MetaTableAccessor {
     }
 
     if (LOG.isTraceEnabled()) {
-      LOG.trace("Scanning META" + " starting at row=" + 
Bytes.toStringBinary(startRow)
-        + " stopping at row=" + Bytes.toStringBinary(stopRow) + " for max=" + 
rowUpperLimit
-        + " with caching=" + scan.getCaching());
+      LOG.trace(
+        "Scanning META starting at row={} stopping at row={} for max={} with 
caching={} "
+          + "priority={}",
+        Bytes.toStringBinary(startRow), Bytes.toStringBinary(stopRow), 
rowUpperLimit,
+        scan.getCaching(), scan.getPriority());
     }
 
     int currentRow = 0;
@@ -912,7 +923,7 @@ public final class MetaTableAccessor {
     addRegionInfo(put, regionInfo);
     addLocation(put, sn, openSeqNum, regionInfo.getReplicaId());
     putToMetaTable(connection, put);
-    LOG.info("Updated row {} with server=", 
regionInfo.getRegionNameAsString(), sn);
+    LOG.info("Updated row {} with server = {}", 
regionInfo.getRegionNameAsString(), sn);
   }
 
   public static Put addRegionInfo(final Put p, final RegionInfo hri) throws 
IOException {
@@ -937,7 +948,7 @@ public final class MetaTableAccessor {
       
.add(builder.clear().setRow(p.getRow()).setFamily(HConstants.CATALOG_FAMILY)
         .setQualifier(CatalogFamilyFormat.getStartCodeColumn(replicaId))
         .setTimestamp(p.getTimestamp()).setType(Cell.Type.Put)
-        .setValue(Bytes.toBytes(sn.getStartcode())).build())
+        .setValue(Bytes.toBytes(sn.getStartCode())).build())
       
.add(builder.clear().setRow(p.getRow()).setFamily(HConstants.CATALOG_FAMILY)
         
.setQualifier(CatalogFamilyFormat.getSeqNumColumn(replicaId)).setTimestamp(p.getTimestamp())
         .setType(Type.Put).setValue(Bytes.toBytes(openSeqNum)).build());
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java
index a86e6554b1c..97c3a876525 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.ipc;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
+import 
org.apache.hadoop.hbase.regionserver.RSAnnotationReadingPriorityFunction;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 
@@ -32,7 +33,10 @@ public class MetaRWQueueRpcExecutor extends 
RWQueueRpcExecutor {
     "hbase.ipc.server.metacallqueue.read.ratio";
   public static final String META_CALL_QUEUE_SCAN_SHARE_CONF_KEY =
     "hbase.ipc.server.metacallqueue.scan.ratio";
-  public static final float DEFAULT_META_CALL_QUEUE_READ_SHARE = 0.9f;
+  public static final String META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
+    "hbase.ipc.server.metacallqueue.handler.factor";
+  public static final float DEFAULT_META_CALL_QUEUE_READ_SHARE = 0.8f;
+  private static final float DEFAULT_META_CALL_QUEUE_SCAN_SHARE = 0.2f;
 
   public MetaRWQueueRpcExecutor(final String name, final int handlerCount, 
final int maxQueueLength,
     final PriorityFunction priority, final Configuration conf, final Abortable 
abortable) {
@@ -46,6 +50,23 @@ public class MetaRWQueueRpcExecutor extends 
RWQueueRpcExecutor {
 
   @Override
   protected float getScanShare(final Configuration conf) {
-    return conf.getFloat(META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
+    return conf.getFloat(META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 
DEFAULT_META_CALL_QUEUE_SCAN_SHARE);
+  }
+
+  @Override
+  public boolean dispatch(CallRunner callTask) {
+    RpcCall call = callTask.getRpcCall();
+    int level = call.getHeader().getPriority();
+    final boolean toWriteQueue = isWriteRequest(call.getHeader(), 
call.getParam());
+    // dispatch client system read request to read handlers
+    // dispatch internal system read request to scan handlers
+    final boolean toScanQueue =
+      getNumScanQueues() > 0 && level == 
RSAnnotationReadingPriorityFunction.INTERNAL_READ_QOS;
+    return dispatchTo(toWriteQueue, toScanQueue, callTask);
+  }
+
+  @Override
+  protected float getCallQueueHandlerFactor(Configuration conf) {
+    return conf.getFloat(META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.5f);
   }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
index 298a9fc3aeb..70a7b74b8e2 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -297,4 +297,8 @@ public class RWQueueRpcExecutor extends RpcExecutor {
       ((ConfigurationObserver) balancer).onConfigurationChange(conf);
     }
   }
+
+  protected int getNumScanQueues() {
+    return numScanQueues;
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index 7e5bdfcc7d6..15c9afe030c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -130,7 +130,7 @@ public abstract class RpcExecutor {
     this.conf = conf;
     this.abortable = abortable;
 
-    float callQueuesHandlersFactor = 
this.conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.1f);
+    float callQueuesHandlersFactor = getCallQueueHandlerFactor(conf);
     if (
       Float.compare(callQueuesHandlersFactor, 1.0f) > 0
         || Float.compare(0.0f, callQueuesHandlersFactor) > 0
@@ -468,4 +468,8 @@ public abstract class RpcExecutor {
       }
     }
   }
+
+  protected float getCallQueueHandlerFactor(Configuration conf) {
+    return conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.1f);
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSAnnotationReadingPriorityFunction.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSAnnotationReadingPriorityFunction.java
index 1197f7b5359..94c76cf55a4 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSAnnotationReadingPriorityFunction.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSAnnotationReadingPriorityFunction.java
@@ -46,7 +46,8 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader
  * Priority function specifically for the region server.
  */
 @InterfaceAudience.Private
-class RSAnnotationReadingPriorityFunction extends 
AnnotationReadingPriorityFunction<RSRpcServices> {
+public class RSAnnotationReadingPriorityFunction
+  extends AnnotationReadingPriorityFunction<RSRpcServices> {
 
   private static final Logger LOG =
     LoggerFactory.getLogger(RSAnnotationReadingPriorityFunction.class);
@@ -54,6 +55,9 @@ class RSAnnotationReadingPriorityFunction extends 
AnnotationReadingPriorityFunct
   /** Used to control the scan delay, currently sqrt(numNextCall * weight) */
   public static final String SCAN_VTIME_WEIGHT_CONF_KEY = 
"hbase.ipc.server.scan.vtime.weight";
 
+  // QOS for internal meta read requests
+  public static final int INTERNAL_READ_QOS = 250;
+
   @SuppressWarnings("unchecked")
   private final Class<? extends Message>[] knownArgumentClasses =
     new Class[] { GetRegionInfoRequest.class, GetStoreFileRequest.class, 
CloseRegionRequest.class,
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index 19aa46a0d62..eed7d98d735 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
+import 
org.apache.hadoop.hbase.regionserver.RSAnnotationReadingPriorityFunction;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RPCTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -108,7 +109,7 @@ public class TestSimpleRpcScheduler {
     RpcScheduler scheduler = new SimpleRpcScheduler(conf, 10, 0, 0, 
qosFunction, 0);
     scheduler.init(CONTEXT);
     scheduler.start();
-    CallRunner task = createMockTask();
+    CallRunner task = createMockTask(HConstants.NORMAL_QOS);
     task.setStatus(new MonitoredRPCHandlerImpl("test"));
     scheduler.dispatch(task);
     verify(task, timeout(10000)).run();
@@ -163,7 +164,7 @@ public class TestSimpleRpcScheduler {
 
     int totalCallMethods = 10;
     for (int i = totalCallMethods; i > 0; i--) {
-      CallRunner task = createMockTask();
+      CallRunner task = createMockTask(HConstants.NORMAL_QOS);
       task.setStatus(new MonitoredRPCHandlerImpl("test"));
       scheduler.dispatch(task);
     }
@@ -185,9 +186,9 @@ public class TestSimpleRpcScheduler {
 
   @Test
   public void testHandlerIsolation() throws IOException, InterruptedException {
-    CallRunner generalTask = createMockTask();
-    CallRunner priorityTask = createMockTask();
-    CallRunner replicationTask = createMockTask();
+    CallRunner generalTask = createMockTask(HConstants.NORMAL_QOS);
+    CallRunner priorityTask = createMockTask(HConstants.HIGH_QOS + 1);
+    CallRunner replicationTask = createMockTask(HConstants.REPLICATION_QOS);
     List<CallRunner> tasks = ImmutableList.of(generalTask, priorityTask, 
replicationTask);
     Map<CallRunner, Integer> qos = ImmutableMap.of(generalTask, 0, 
priorityTask,
       HConstants.HIGH_QOS + 1, replicationTask, HConstants.REPLICATION_QOS);
@@ -227,10 +228,12 @@ public class TestSimpleRpcScheduler {
     assertEquals(3, ImmutableSet.copyOf(handlerThreads.values()).size());
   }
 
-  private CallRunner createMockTask() {
+  private CallRunner createMockTask(int priority) {
     ServerCall call = mock(ServerCall.class);
     CallRunner task = mock(CallRunner.class);
+    RequestHeader header = 
RequestHeader.newBuilder().setPriority(priority).build();
     when(task.getRpcCall()).thenReturn(call);
+    when(call.getHeader()).thenReturn(header);
     return task;
   }
 
@@ -707,7 +710,7 @@ public class TestSimpleRpcScheduler {
   @Test
   public void testMetaRWScanQueues() throws Exception {
     Configuration schedConf = HBaseConfiguration.create();
-    schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
+    
schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY,
 1.0f);
     
schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 
0.7f);
     
schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 
0.5f);
 
@@ -728,36 +731,37 @@ public class TestSimpleRpcScheduler {
       when(putCall.getHeader()).thenReturn(putHead);
       when(putCall.getParam()).thenReturn(putCall.param);
 
-      CallRunner getCallTask = mock(CallRunner.class);
-      ServerCall getCall = mock(ServerCall.class);
-      RequestHeader getHead = 
RequestHeader.newBuilder().setMethodName("get").build();
-      when(getCallTask.getRpcCall()).thenReturn(getCall);
-      when(getCall.getHeader()).thenReturn(getHead);
-
-      CallRunner scanCallTask = mock(CallRunner.class);
-      ServerCall scanCall = mock(ServerCall.class);
-      scanCall.param = ScanRequest.newBuilder().build();
-      RequestHeader scanHead = 
RequestHeader.newBuilder().setMethodName("scan").build();
-      when(scanCallTask.getRpcCall()).thenReturn(scanCall);
-      when(scanCall.getHeader()).thenReturn(scanHead);
-      when(scanCall.getParam()).thenReturn(scanCall.param);
+      CallRunner clientReadCallTask = mock(CallRunner.class);
+      ServerCall clientReadCall = mock(ServerCall.class);
+      RequestHeader clientReadHead = 
RequestHeader.newBuilder().setMethodName("get").build();
+      when(clientReadCallTask.getRpcCall()).thenReturn(clientReadCall);
+      when(clientReadCall.getHeader()).thenReturn(clientReadHead);
+
+      CallRunner internalReadCallTask = mock(CallRunner.class);
+      ServerCall internalReadCall = mock(ServerCall.class);
+      internalReadCall.param = ScanRequest.newBuilder().build();
+      RequestHeader masterReadHead = 
RequestHeader.newBuilder().setMethodName("scan")
+        
.setPriority(RSAnnotationReadingPriorityFunction.INTERNAL_READ_QOS).build();
+      when(internalReadCallTask.getRpcCall()).thenReturn(internalReadCall);
+      when(internalReadCall.getHeader()).thenReturn(masterReadHead);
+      when(internalReadCall.getParam()).thenReturn(internalReadCall.param);
 
       ArrayList<Integer> work = new ArrayList<>();
       doAnswerTaskExecution(putCallTask, work, 1, 1000);
-      doAnswerTaskExecution(getCallTask, work, 2, 1000);
-      doAnswerTaskExecution(scanCallTask, work, 3, 1000);
+      doAnswerTaskExecution(clientReadCallTask, work, 2, 1000);
+      doAnswerTaskExecution(internalReadCallTask, work, 3, 1000);
 
       // There are 3 queues: [puts], [gets], [scans]
       // so the calls will be interleaved
       scheduler.dispatch(putCallTask);
       scheduler.dispatch(putCallTask);
       scheduler.dispatch(putCallTask);
-      scheduler.dispatch(getCallTask);
-      scheduler.dispatch(getCallTask);
-      scheduler.dispatch(getCallTask);
-      scheduler.dispatch(scanCallTask);
-      scheduler.dispatch(scanCallTask);
-      scheduler.dispatch(scanCallTask);
+      scheduler.dispatch(clientReadCallTask);
+      scheduler.dispatch(clientReadCallTask);
+      scheduler.dispatch(clientReadCallTask);
+      scheduler.dispatch(internalReadCallTask);
+      scheduler.dispatch(internalReadCallTask);
+      scheduler.dispatch(internalReadCallTask);
 
       while (work.size() < 6) {
         Thread.sleep(100);

Reply via email to