ndimiduk commented on a change in pull request #2570:
URL: https://github.com/apache/hbase/pull/2570#discussion_r509749819



##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaReplicaLoadBalanceReplicaSimpleChooser.java
##########
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ThreadLocalRandom;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_META_REPLICA_NUM;
+import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
+import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
+
+/**
+ * MetaReplicaLoadBalanceReplicaSimpleChooser implements a simple meta replica 
load balancing
+ * algorithm. It maintains a stale location cache for each table. Whenever 
client looks up meta,
+ * it first check if the row is the stale location cache, if yes, this means 
the the location from
+ * meta replica is stale, it will go to the primary meta to look up 
update-to-date location;
+ * otherwise, it will randomly pick up a meta replica region for meta lookup. 
When clients receive
+ * RegionNotServedException from region servers, it will add these region 
locations to the stale
+ * location cache. The stale cache will be cleaned up periodically by a chore.
+ */
+
+/**
+ * StaleLocationCacheEntry is the entry when a stale location is reported by 
an client.
+ */
+class StaleLocationCacheEntry {
+  // meta replica id where
+  private int metaReplicaId;
+
+  // timestamp in milliseconds
+  private long timestamp;
+
+  private byte[] endKey;
+
+  StaleLocationCacheEntry(final int metaReplicaId, final byte[] endKey) {
+    this.metaReplicaId = metaReplicaId;
+    this.endKey = endKey;
+    timestamp = System.currentTimeMillis();
+  }
+
+  public byte[] getEndKey() {
+    return this.endKey;
+  }
+
+  public int getMetaReplicaId() {
+    return this.metaReplicaId;
+  }
+  public long getTimestamp() {
+    return this.timestamp;
+  }
+
+  @Override
+  public String toString() {
+    return new ToStringBuilder(this)
+      .append("endKey", endKey)
+      .append("metaReplicaId", metaReplicaId)
+      .append("timestamp", timestamp)
+      .toString();
+  }
+}
+
+/**
+ * A simple implementation of MetaReplicaLoadBalanceReplicaChooser.
+ *
+ * It follows a simple algorithm to choose a meta replica to go:
+ *
+ *  1. If there is no stale location entry for rows it looks up, it will 
randomly
+ *     pick a meta replica region to do lookup.
+ *  2. If the location from meta replica region is stale, client gets 
RegionNotServedException
+ *     from region server, in this case, it will create 
StaleLocationCacheEntry in
+ *     MetaReplicaLoadBalanceReplicaSimpleChooser.
+ *  3. When client tries to do meta lookup, it checks StaleLocationCache first 
for rows it tries to
+ *     lookup, if entry exists, it will go with primary meta region to do 
lookup; otherwise, it
+ *     will follow step 1.
+ *  4. A chore will periodically run to clean up cache entries in the 
StaleLocationCache.
+ */
+class MetaReplicaLoadBalanceReplicaSimpleChooser implements 
MetaReplicaLoadBalanceReplicaChooser {
+  private static final Logger LOG =
+    LoggerFactory.getLogger(MetaReplicaLoadBalanceReplicaSimpleChooser.class);
+  private final long STALE_CACHE_TIMEOUT_IN_MILLISECONDS = 3000; // 3 seconds
+  private final int STALE_CACHE_CLEAN_CHORE_INTERVAL = 1500; // 1.5 seconds
+
+  private final class StaleTableCache {
+    private final ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> 
cache =
+      new ConcurrentSkipListMap<>(BYTES_COMPARATOR);
+  }
+
+  private final ConcurrentMap<TableName, StaleTableCache> staleCache;
+  private final int numOfMetaReplicas;
+  private final AsyncConnectionImpl conn;
+
+  MetaReplicaLoadBalanceReplicaSimpleChooser(final AsyncConnectionImpl conn) {
+    staleCache = new ConcurrentHashMap<>();
+    this.numOfMetaReplicas = conn.getConfiguration().getInt(
+      META_REPLICAS_NUM, DEFAULT_META_REPLICA_NUM);
+    this.conn = conn;
+    this.conn.getChoreService().scheduleChore(getCacheCleanupChore(this));
+  }
+
+  /**
+   * When a client runs into RegionNotServingException, it will call this 
method to

Review comment:
       Javadoc should be on the `interface`.

##########
File path: hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
##########
@@ -1128,6 +1128,9 @@
 
   /** Conf key for enabling meta replication */
   public static final String USE_META_REPLICAS = "hbase.meta.replicas.use";
+  public static final String META_REPLICAS_MODE = "hbase.meta.replicas.mode";

Review comment:
       What do you propose @saintstack ? `hbase.catalogue.replicas.mode` ?

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaReplicaLoadBalanceReplicaSimpleChooser.java
##########
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ThreadLocalRandom;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_META_REPLICA_NUM;
+import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
+import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
+
+/**
+ * MetaReplicaLoadBalanceReplicaSimpleChooser implements a simple meta replica 
load balancing
+ * algorithm. It maintains a stale location cache for each table. Whenever 
client looks up meta,
+ * it first check if the row is the stale location cache, if yes, this means 
the the location from
+ * meta replica is stale, it will go to the primary meta to look up 
update-to-date location;
+ * otherwise, it will randomly pick up a meta replica region for meta lookup. 
When clients receive
+ * RegionNotServedException from region servers, it will add these region 
locations to the stale
+ * location cache. The stale cache will be cleaned up periodically by a chore.
+ */
+
+/**
+ * StaleLocationCacheEntry is the entry when a stale location is reported by 
an client.
+ */
+class StaleLocationCacheEntry {
+  // meta replica id where
+  private int metaReplicaId;
+
+  // timestamp in milliseconds
+  private long timestamp;
+
+  private byte[] endKey;
+
+  StaleLocationCacheEntry(final int metaReplicaId, final byte[] endKey) {
+    this.metaReplicaId = metaReplicaId;
+    this.endKey = endKey;
+    timestamp = System.currentTimeMillis();
+  }
+
+  public byte[] getEndKey() {
+    return this.endKey;
+  }
+
+  public int getMetaReplicaId() {
+    return this.metaReplicaId;
+  }
+  public long getTimestamp() {
+    return this.timestamp;
+  }
+
+  @Override
+  public String toString() {
+    return new ToStringBuilder(this)
+      .append("endKey", endKey)
+      .append("metaReplicaId", metaReplicaId)
+      .append("timestamp", timestamp)
+      .toString();
+  }
+}
+
+/**
+ * A simple implementation of MetaReplicaLoadBalanceReplicaChooser.
+ *
+ * It follows a simple algorithm to choose a meta replica to go:
+ *
+ *  1. If there is no stale location entry for rows it looks up, it will 
randomly
+ *     pick a meta replica region to do lookup.
+ *  2. If the location from meta replica region is stale, client gets 
RegionNotServedException
+ *     from region server, in this case, it will create 
StaleLocationCacheEntry in
+ *     MetaReplicaLoadBalanceReplicaSimpleChooser.
+ *  3. When client tries to do meta lookup, it checks StaleLocationCache first 
for rows it tries to
+ *     lookup, if entry exists, it will go with primary meta region to do 
lookup; otherwise, it
+ *     will follow step 1.
+ *  4. A chore will periodically run to clean up cache entries in the 
StaleLocationCache.
+ */
+class MetaReplicaLoadBalanceReplicaSimpleChooser implements 
MetaReplicaLoadBalanceReplicaChooser {
+  private static final Logger LOG =
+    LoggerFactory.getLogger(MetaReplicaLoadBalanceReplicaSimpleChooser.class);
+  private final long STALE_CACHE_TIMEOUT_IN_MILLISECONDS = 3000; // 3 seconds
+  private final int STALE_CACHE_CLEAN_CHORE_INTERVAL = 1500; // 1.5 seconds
+
+  private final class StaleTableCache {
+    private final ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> 
cache =
+      new ConcurrentSkipListMap<>(BYTES_COMPARATOR);
+  }
+
+  private final ConcurrentMap<TableName, StaleTableCache> staleCache;
+  private final int numOfMetaReplicas;
+  private final AsyncConnectionImpl conn;
+
+  MetaReplicaLoadBalanceReplicaSimpleChooser(final AsyncConnectionImpl conn) {
+    staleCache = new ConcurrentHashMap<>();
+    this.numOfMetaReplicas = conn.getConfiguration().getInt(
+      META_REPLICAS_NUM, DEFAULT_META_REPLICA_NUM);
+    this.conn = conn;
+    this.conn.getChoreService().scheduleChore(getCacheCleanupChore(this));
+  }
+
+  /**
+   * When a client runs into RegionNotServingException, it will call this 
method to
+   * update Chooser's internal state.
+   * @param loc the location which causes exception.
+   * @param fromMetaReplicaId the meta replica id where the location comes 
from.
+   */
+  public void updateCacheOnError(final HRegionLocation loc, final int 
fromMetaReplicaId) {
+    StaleTableCache tableCache =
+      computeIfAbsent(staleCache, loc.getRegion().getTable(), 
StaleTableCache::new);
+    byte[] startKey = loc.getRegion().getStartKey();
+    tableCache.cache.putIfAbsent(startKey,
+      new StaleLocationCacheEntry(fromMetaReplicaId, 
loc.getRegion().getEndKey()));
+    LOG.debug("Add entry to stale cache for table {} with startKey {}, {}",
+      loc.getRegion().getTable(), startKey, loc.getRegion().getEndKey());
+  }
+
+  /**
+   * When it does a meta lookup, it will call this method to find a meta 
replica to go.
+   * @param tablename  table name it looks up
+   * @param row   key it looks up.
+   * @param locateType locateType, Only BEFORE and CURRENT will be passed in.
+   * @return meta replica id
+   */
+  public int chooseReplicaToGo(final TableName tablename, final byte[] row,
+    final RegionLocateType locateType) {
+    StaleTableCache tableCache = staleCache.get(tablename);
+    int metaReplicaId = 1 + 
ThreadLocalRandom.current().nextInt(this.numOfMetaReplicas - 1);
+
+    // If there is no entry in StaleCache, pick a random meta replica id.
+    if (tableCache == null) {
+      return metaReplicaId;
+    }
+
+    Map.Entry<byte[], StaleLocationCacheEntry> entry;
+    boolean isEmptyStopRow = isEmptyStopRow(row);
+    // Only BEFORE and CURRENT are passed in.
+    if (locateType == RegionLocateType.BEFORE) {
+      entry = isEmptyStopRow ? tableCache.cache.lastEntry() : 
tableCache.cache.lowerEntry(row);
+    } else {
+      entry = tableCache.cache.floorEntry(row);
+    }
+
+    if (entry == null) {
+      return metaReplicaId;
+    }
+
+    // Check if the entry times out.
+    if ((System.currentTimeMillis() - entry.getValue().getTimestamp()) >=

Review comment:
       EnvironmentEdge please.
   
   Same nit: move the calculation out to a local variable to keep the 
if-condition concise.

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaReplicaLoadBalanceReplicaChooser.java
##########
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * There are two modes with meta replica support.
+ *   HighAvailable    - Client sends requests to the primary meta region 
first, within a
+ *                      configured amount of time, if  there is no response 
coming back,
+ *                      client sends requests to all replica regions and takes 
the first
+ *                      response.
+ *
+ *   LoadBalance      - Client sends requests to meta replica regions in a 
round-robin mode,
+ *                      if results from replica regions are stale, next time, 
client sends requests for
+ *                      these stable locations to the primary meta region. In 
this mode, scan
+ *                      requests are load balanced across all replica regions.
+ */
+enum MetaReplicaMode {
+  None,
+  HighAvailable,
+  LoadBalance
+}
+
+/**
+ * A Meta replica chooser decides which meta replica to go for scan requests.
+ */
+@InterfaceAudience.Private
+public interface MetaReplicaLoadBalanceReplicaChooser {
+
+  void updateCacheOnError(final HRegionLocation loc, final int 
fromMetaReplicaId);

Review comment:
       Calling it "meta" is fine at this point, the only other thing we could 
be balancing requests over is a user table, and there's separate mechanism for 
that functionality (I haven't looked into why that functionality cannot be 
reused here, but I presume it pertains to the different means of 
configuration... which is a bit of a shame).
   
   If we need to use this to spread requests over replicas of other system 
tables, we can update the class name accordingly.

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaReplicaLoadBalanceReplicaSimpleChooser.java
##########
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ThreadLocalRandom;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_META_REPLICA_NUM;
+import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
+import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
+
+/**
+ * MetaReplicaLoadBalanceReplicaSimpleChooser implements a simple meta replica 
load balancing
+ * algorithm. It maintains a stale location cache for each table. Whenever 
client looks up meta,
+ * it first check if the row is the stale location cache, if yes, this means 
the the location from
+ * meta replica is stale, it will go to the primary meta to look up 
update-to-date location;
+ * otherwise, it will randomly pick up a meta replica region for meta lookup. 
When clients receive
+ * RegionNotServedException from region servers, it will add these region 
locations to the stale
+ * location cache. The stale cache will be cleaned up periodically by a chore.
+ */
+
+/**
+ * StaleLocationCacheEntry is the entry when a stale location is reported by 
an client.
+ */
+class StaleLocationCacheEntry {

Review comment:
       Unless the inner class instance is explicitly bound to the lifetime of 
the outer class and the outer class's implementation details, I generally 
agree. In this case, you have a simple pojo, so a separate class or at least a 
static inner class is best.

##########
File path: 
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
##########
@@ -338,4 +476,134 @@ static boolean contains(List<Result> contains, List<Cell> 
cells) throws IOExcept
     }
     return !containsScanner.advance() && matches >= 1 && count >= matches && 
count == cells.size();
   }
+
+  private void doNGets(final Table table, final byte[][] keys) throws 
Exception {
+    for (byte[] key : keys) {
+      Result r = table.get(new Get(key));
+      assertArrayEquals(VALUE, r.getValue(HConstants.CATALOG_FAMILY, 
HConstants.CATALOG_FAMILY));
+    }
+  }
+
+  private void primaryNoChangeReplicaIncrease(final long[] before, final 
long[] after) {
+    assertEquals(before[RegionInfo.DEFAULT_REPLICA_ID],
+      after[RegionInfo.DEFAULT_REPLICA_ID]);
+
+    for (int i = 1; i < after.length; i ++) {
+      assertTrue(after[i] > before[i]);
+    }
+  }
+
+  private void primaryIncreaseReplicaNoChange(final long[] before, final 
long[] after) {
+    // There are read requests increase for primary meta replica.
+    assertTrue(after[RegionInfo.DEFAULT_REPLICA_ID] >
+      before[RegionInfo.DEFAULT_REPLICA_ID]);
+
+    // No change for replica regions
+    for (int i = 1; i < after.length; i ++) {
+      assertEquals(before[i], after[i]);
+    }
+  }
+
+  private void getMetaReplicaReadRequests(final Region[] metaRegions, final 
long[] counters) {
+    int i = 0;
+    for (Region r : metaRegions) {
+      LOG.info("read request for region {} is {}", r, 
r.getReadRequestsCount());
+      counters[i] = r.getReadRequestsCount();
+      i ++;
+    }
+  }
+
+  @Test
+  public void testHBaseMetaReplicaGets() throws Exception {
+
+    TableName tn = TableName.valueOf(this.name.getMethodName());
+    final Region[] metaRegions = getAllRegions(TableName.META_TABLE_NAME, 
numOfMetaReplica);
+    long[] readReqsForMetaReplicas = new long[numOfMetaReplica];
+    long[] readReqsForMetaReplicasAfterGet = new long[numOfMetaReplica];
+    long[] readReqsForMetaReplicasAfterMove = new long[numOfMetaReplica];
+    long[] readReqsForMetaReplicasAfterSecondMove = new long[numOfMetaReplica];
+    long[] readReqsForMetaReplicasAfterThirdGet = new long[numOfMetaReplica];
+    Region userRegion = null;
+    HRegionServer srcRs = null;
+    HRegionServer destRs = null;
+
+    try (Table table = HTU.createTable(tn, HConstants.CATALOG_FAMILY,
+      Arrays.copyOfRange(HBaseTestingUtility.KEYS, 1, 
HBaseTestingUtility.KEYS.length))) {
+      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, 
getMetaCells(table.getName()));
+      // load different values
+      HTU.loadTable(table, new byte[][] { HConstants.CATALOG_FAMILY }, VALUE);
+      for (int i = 0; i < NB_SERVERS; i++) {
+        HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
+        List<HRegion> onlineRegions = rs.getRegions(tn);
+        if (onlineRegions.size() > 0) {
+          userRegion = onlineRegions.get(0);
+          srcRs = rs;
+          if (i > 0) {
+            destRs = HTU.getMiniHBaseCluster().getRegionServer(0);
+          } else {
+            destRs = HTU.getMiniHBaseCluster().getRegionServer(1);
+          }
+        }
+      }
+
+      getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicas);
+
+      Configuration c = new Configuration(HTU.getConfiguration());
+      c.setBoolean(HConstants.USE_META_REPLICAS, true);
+      c.set(HConstants.META_REPLICAS_MODE, "LoadBalance");
+      Connection connection = ConnectionFactory.createConnection(c);
+      Table tableForGet = connection.getTable(tn);
+      byte[][] getRows = new byte[HBaseTestingUtility.KEYS.length][];
+
+      int i = 0;
+      for (byte[] key : HBaseTestingUtility.KEYS) {
+        getRows[i] = key;
+        i++;
+      }
+      getRows[0] = Bytes.toBytes("aaa");
+      doNGets(tableForGet, getRows);
+
+      getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterGet);
+
+      // There is no read requests increase for primary meta replica.
+      // For rest of meta replicas, there are more reads against them.
+      primaryNoChangeReplicaIncrease(readReqsForMetaReplicas, 
readReqsForMetaReplicasAfterGet);
+
+      // move one of regions so it meta cache may be invalid.
+      HTU.moveRegionAndWait(userRegion.getRegionInfo(), 
destRs.getServerName());
+
+      doNGets(tableForGet, getRows);
+
+      getMetaReplicaReadRequests(metaRegions, 
readReqsForMetaReplicasAfterMove);
+
+      // There are read requests increase for primary meta replica.
+      // For rest of meta replicas, there is no change as regionMove will tell 
the new location
+      primaryIncreaseReplicaNoChange(readReqsForMetaReplicasAfterGet,
+        readReqsForMetaReplicasAfterMove);
+      // Move region again.
+      HTU.moveRegionAndWait(userRegion.getRegionInfo(), srcRs.getServerName());
+
+      // Wait until moveRegion cache timeout.
+      while 
(destRs.getMovedRegion(userRegion.getRegionInfo().getEncodedName()) != null) {
+        Thread.sleep(1000);

Review comment:
       please use `waitFor` with a maximum time to wait.

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaReplicaLoadBalanceReplicaSimpleChooser.java
##########
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ThreadLocalRandom;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_META_REPLICA_NUM;
+import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
+import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
+
+/**
+ * MetaReplicaLoadBalanceReplicaSimpleChooser implements a simple meta replica 
load balancing
+ * algorithm. It maintains a stale location cache for each table. Whenever 
client looks up meta,
+ * it first check if the row is the stale location cache, if yes, this means 
the the location from
+ * meta replica is stale, it will go to the primary meta to look up 
update-to-date location;
+ * otherwise, it will randomly pick up a meta replica region for meta lookup. 
When clients receive
+ * RegionNotServedException from region servers, it will add these region 
locations to the stale
+ * location cache. The stale cache will be cleaned up periodically by a chore.
+ */
+
+/**
+ * StaleLocationCacheEntry is the entry when a stale location is reported by 
an client.
+ */
+class StaleLocationCacheEntry {
+  // meta replica id where
+  private int metaReplicaId;
+
+  // timestamp in milliseconds
+  private long timestamp;
+
+  private byte[] endKey;
+
+  StaleLocationCacheEntry(final int metaReplicaId, final byte[] endKey) {
+    this.metaReplicaId = metaReplicaId;
+    this.endKey = endKey;
+    timestamp = System.currentTimeMillis();
+  }
+
+  public byte[] getEndKey() {
+    return this.endKey;
+  }
+
+  public int getMetaReplicaId() {
+    return this.metaReplicaId;
+  }
+  public long getTimestamp() {
+    return this.timestamp;
+  }
+
+  @Override
+  public String toString() {
+    return new ToStringBuilder(this)
+      .append("endKey", endKey)
+      .append("metaReplicaId", metaReplicaId)
+      .append("timestamp", timestamp)
+      .toString();
+  }
+}
+
+/**
+ * A simple implementation of MetaReplicaLoadBalanceReplicaChooser.
+ *
+ * It follows a simple algorithm to choose a meta replica to go:
+ *
+ *  1. If there is no stale location entry for rows it looks up, it will 
randomly
+ *     pick a meta replica region to do lookup.
+ *  2. If the location from meta replica region is stale, client gets 
RegionNotServedException
+ *     from region server, in this case, it will create 
StaleLocationCacheEntry in
+ *     MetaReplicaLoadBalanceReplicaSimpleChooser.
+ *  3. When client tries to do meta lookup, it checks StaleLocationCache first 
for rows it tries to
+ *     lookup, if entry exists, it will go with primary meta region to do 
lookup; otherwise, it
+ *     will follow step 1.
+ *  4. A chore will periodically run to clean up cache entries in the 
StaleLocationCache.
+ */
+class MetaReplicaLoadBalanceReplicaSimpleChooser implements 
MetaReplicaLoadBalanceReplicaChooser {

Review comment:
       > For a normal case, > 99% of region locations from meta replica will be 
up to date.
   
   It would be a good idea to state this up-front in a javadoc comment so that 
readers and future maintainers understand your baseline assumptions.

##########
File path: hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
##########
@@ -1128,6 +1128,9 @@
 
   /** Conf key for enabling meta replication */
   public static final String USE_META_REPLICAS = "hbase.meta.replicas.use";
+  public static final String META_REPLICAS_MODE = "hbase.meta.replicas.mode";
+  public static final String META_REPLICAS_MODE_LOADBALANCE_REPILCA_CHOOSER =
+    "hbase.meta.replicas.mode.loadbalance.replica.chooser";

Review comment:
       I see you're forced to work with existing configuration names, but i 
wish these were all `hbase.meta_replica.`

##########
File path: 
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
##########
@@ -176,18 +184,136 @@ private boolean 
isMetaRegionReplicaReplicationSource(HRegionServer hrs) {
    */
   @Test
   public void testHBaseMetaReplicates() throws Exception {
-    try (Table table = 
HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_0"),
-      HConstants.CATALOG_FAMILY,
-        Arrays.copyOfRange(HBaseTestingUtility.KEYS, 1, 
HBaseTestingUtility.KEYS.length)))  {
-      verifyReplication(TableName.META_TABLE_NAME, NB_SERVERS, 
getMetaCells(table.getName()));
+    try (Table table = HTU
+      .createTable(TableName.valueOf(this.name.getMethodName() + "_0"), 
HConstants.CATALOG_FAMILY,

Review comment:
       FYI, there's a `TableNameTestRule` for generating the `TableName` from 
the running method. I guess it doesn't help where you're adding a suffix 
though...

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaReplicaLoadBalanceReplicaSimpleChooser.java
##########
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ThreadLocalRandom;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_META_REPLICA_NUM;
+import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
+import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
+
+/**
+ * MetaReplicaLoadBalanceReplicaSimpleChooser implements a simple meta replica 
load balancing
+ * algorithm. It maintains a stale location cache for each table. Whenever 
client looks up meta,
+ * it first check if the row is the stale location cache, if yes, this means 
the the location from
+ * meta replica is stale, it will go to the primary meta to look up 
update-to-date location;
+ * otherwise, it will randomly pick up a meta replica region for meta lookup. 
When clients receive
+ * RegionNotServedException from region servers, it will add these region 
locations to the stale
+ * location cache. The stale cache will be cleaned up periodically by a chore.
+ */
+
+/**
+ * StaleLocationCacheEntry is the entry when a stale location is reported by 
an client.
+ */
+class StaleLocationCacheEntry {
+  // meta replica id where
+  private int metaReplicaId;
+
+  // timestamp in milliseconds
+  private long timestamp;
+
+  private byte[] endKey;
+
+  StaleLocationCacheEntry(final int metaReplicaId, final byte[] endKey) {
+    this.metaReplicaId = metaReplicaId;
+    this.endKey = endKey;
+    timestamp = System.currentTimeMillis();
+  }
+
+  public byte[] getEndKey() {
+    return this.endKey;
+  }
+
+  public int getMetaReplicaId() {
+    return this.metaReplicaId;
+  }
+  public long getTimestamp() {
+    return this.timestamp;
+  }
+
+  @Override
+  public String toString() {
+    return new ToStringBuilder(this)
+      .append("endKey", endKey)
+      .append("metaReplicaId", metaReplicaId)
+      .append("timestamp", timestamp)
+      .toString();
+  }
+}
+
+/**
+ * A simple implementation of MetaReplicaLoadBalanceReplicaChooser.
+ *
+ * It follows a simple algorithm to choose a meta replica to go:
+ *
+ *  1. If there is no stale location entry for rows it looks up, it will 
randomly
+ *     pick a meta replica region to do lookup.
+ *  2. If the location from meta replica region is stale, client gets 
RegionNotServedException
+ *     from region server, in this case, it will create 
StaleLocationCacheEntry in
+ *     MetaReplicaLoadBalanceReplicaSimpleChooser.
+ *  3. When client tries to do meta lookup, it checks StaleLocationCache first 
for rows it tries to
+ *     lookup, if entry exists, it will go with primary meta region to do 
lookup; otherwise, it
+ *     will follow step 1.
+ *  4. A chore will periodically run to clean up cache entries in the 
StaleLocationCache.
+ */
+class MetaReplicaLoadBalanceReplicaSimpleChooser implements 
MetaReplicaLoadBalanceReplicaChooser {
+  private static final Logger LOG =
+    LoggerFactory.getLogger(MetaReplicaLoadBalanceReplicaSimpleChooser.class);
+  private final long STALE_CACHE_TIMEOUT_IN_MILLISECONDS = 3000; // 3 seconds
+  private final int STALE_CACHE_CLEAN_CHORE_INTERVAL = 1500; // 1.5 seconds
+
+  private final class StaleTableCache {
+    private final ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> 
cache =
+      new ConcurrentSkipListMap<>(BYTES_COMPARATOR);
+  }
+
+  private final ConcurrentMap<TableName, StaleTableCache> staleCache;
+  private final int numOfMetaReplicas;
+  private final AsyncConnectionImpl conn;
+
+  MetaReplicaLoadBalanceReplicaSimpleChooser(final AsyncConnectionImpl conn) {
+    staleCache = new ConcurrentHashMap<>();
+    this.numOfMetaReplicas = conn.getConfiguration().getInt(
+      META_REPLICAS_NUM, DEFAULT_META_REPLICA_NUM);
+    this.conn = conn;
+    this.conn.getChoreService().scheduleChore(getCacheCleanupChore(this));
+  }
+
+  /**
+   * When a client runs into RegionNotServingException, it will call this 
method to
+   * update Chooser's internal state.
+   * @param loc the location which causes exception.
+   * @param fromMetaReplicaId the meta replica id where the location comes 
from.
+   */
+  public void updateCacheOnError(final HRegionLocation loc, final int 
fromMetaReplicaId) {

Review comment:
       Please use `@Overrides` annotations.

##########
File path: 
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
##########
@@ -63,17 +68,20 @@
 /**
  * Tests RegionReplicaReplicationEndpoint class for hbase:meta by setting up 
region replicas and
  * verifying async wal replication replays the edits to the secondary region 
in various scenarios.
+ *
  * @see TestRegionReplicaReplicationEndpoint
  */
 @Category({LargeTests.class})

Review comment:
       Not yours, but I think this can be be added to the `MasterTests` 
category. Or `RegionServerTests`.. I'm not even clear on which one this is at 
this point. Maybe need a category for `CatalogueTests`.

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaReplicaLoadBalanceReplicaSimpleChooser.java
##########
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ThreadLocalRandom;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_META_REPLICA_NUM;
+import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
+import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
+
+/**
+ * MetaReplicaLoadBalanceReplicaSimpleChooser implements a simple meta replica 
load balancing
+ * algorithm. It maintains a stale location cache for each table. Whenever 
client looks up meta,
+ * it first check if the row is the stale location cache, if yes, this means 
the the location from
+ * meta replica is stale, it will go to the primary meta to look up 
update-to-date location;
+ * otherwise, it will randomly pick up a meta replica region for meta lookup. 
When clients receive
+ * RegionNotServedException from region servers, it will add these region 
locations to the stale
+ * location cache. The stale cache will be cleaned up periodically by a chore.
+ */
+
+/**
+ * StaleLocationCacheEntry is the entry when a stale location is reported by 
an client.
+ */
+class StaleLocationCacheEntry {
+  // meta replica id where
+  private int metaReplicaId;
+
+  // timestamp in milliseconds
+  private long timestamp;
+
+  private byte[] endKey;
+
+  StaleLocationCacheEntry(final int metaReplicaId, final byte[] endKey) {
+    this.metaReplicaId = metaReplicaId;
+    this.endKey = endKey;
+    timestamp = System.currentTimeMillis();
+  }
+
+  public byte[] getEndKey() {
+    return this.endKey;
+  }
+
+  public int getMetaReplicaId() {
+    return this.metaReplicaId;
+  }
+  public long getTimestamp() {
+    return this.timestamp;
+  }
+
+  @Override
+  public String toString() {
+    return new ToStringBuilder(this)
+      .append("endKey", endKey)
+      .append("metaReplicaId", metaReplicaId)
+      .append("timestamp", timestamp)
+      .toString();
+  }
+}
+
+/**
+ * A simple implementation of MetaReplicaLoadBalanceReplicaChooser.
+ *
+ * It follows a simple algorithm to choose a meta replica to go:
+ *
+ *  1. If there is no stale location entry for rows it looks up, it will 
randomly
+ *     pick a meta replica region to do lookup.
+ *  2. If the location from meta replica region is stale, client gets 
RegionNotServedException
+ *     from region server, in this case, it will create 
StaleLocationCacheEntry in
+ *     MetaReplicaLoadBalanceReplicaSimpleChooser.
+ *  3. When client tries to do meta lookup, it checks StaleLocationCache first 
for rows it tries to
+ *     lookup, if entry exists, it will go with primary meta region to do 
lookup; otherwise, it
+ *     will follow step 1.
+ *  4. A chore will periodically run to clean up cache entries in the 
StaleLocationCache.
+ */
+class MetaReplicaLoadBalanceReplicaSimpleChooser implements 
MetaReplicaLoadBalanceReplicaChooser {
+  private static final Logger LOG =
+    LoggerFactory.getLogger(MetaReplicaLoadBalanceReplicaSimpleChooser.class);
+  private final long STALE_CACHE_TIMEOUT_IN_MILLISECONDS = 3000; // 3 seconds
+  private final int STALE_CACHE_CLEAN_CHORE_INTERVAL = 1500; // 1.5 seconds
+
+  private final class StaleTableCache {
+    private final ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> 
cache =
+      new ConcurrentSkipListMap<>(BYTES_COMPARATOR);
+  }
+
+  private final ConcurrentMap<TableName, StaleTableCache> staleCache;
+  private final int numOfMetaReplicas;
+  private final AsyncConnectionImpl conn;
+
+  MetaReplicaLoadBalanceReplicaSimpleChooser(final AsyncConnectionImpl conn) {
+    staleCache = new ConcurrentHashMap<>();
+    this.numOfMetaReplicas = conn.getConfiguration().getInt(
+      META_REPLICAS_NUM, DEFAULT_META_REPLICA_NUM);
+    this.conn = conn;
+    this.conn.getChoreService().scheduleChore(getCacheCleanupChore(this));
+  }
+
+  /**
+   * When a client runs into RegionNotServingException, it will call this 
method to
+   * update Chooser's internal state.
+   * @param loc the location which causes exception.
+   * @param fromMetaReplicaId the meta replica id where the location comes 
from.
+   */
+  public void updateCacheOnError(final HRegionLocation loc, final int 
fromMetaReplicaId) {
+    StaleTableCache tableCache =
+      computeIfAbsent(staleCache, loc.getRegion().getTable(), 
StaleTableCache::new);
+    byte[] startKey = loc.getRegion().getStartKey();
+    tableCache.cache.putIfAbsent(startKey,
+      new StaleLocationCacheEntry(fromMetaReplicaId, 
loc.getRegion().getEndKey()));
+    LOG.debug("Add entry to stale cache for table {} with startKey {}, {}",

Review comment:
       nit: name the "endKey" in your log message.

##########
File path: 
hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorWithMetaReplicaLoadBalance.java
##########
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncNonMetaRegionLocatorWithMetaReplicaLoadBalance

Review comment:
       oh no, please not test inheritance :'(
   
   Can this be done as a `Parameterized` test instead?

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaReplicaLoadBalanceReplicaChooser.java
##########
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * There are two modes with meta replica support.
+ *   HighAvailable    - Client sends requests to the primary meta region 
first, within a
+ *                      configured amount of time, if  there is no response 
coming back,
+ *                      client sends requests to all replica regions and takes 
the first
+ *                      response.
+ *
+ *   LoadBalance      - Client sends requests to meta replica regions in a 
round-robin mode,
+ *                      if results from replica regions are stale, next time, 
client sends requests for
+ *                      these stable locations to the primary meta region. In 
this mode, scan
+ *                      requests are load balanced across all replica regions.
+ */
+enum MetaReplicaMode {
+  None,
+  HighAvailable,
+  LoadBalance
+}
+
+/**
+ * A Meta replica chooser decides which meta replica to go for scan requests.
+ */
+@InterfaceAudience.Private
+public interface MetaReplicaLoadBalanceReplicaChooser {
+
+  void updateCacheOnError(final HRegionLocation loc, final int 
fromMetaReplicaId);
+  int chooseReplicaToGo(final TableName tablename, final byte[] row,

Review comment:
       Oh, I understand. this is the TableName of the data table from which 
we're locating a region. Maybe there's a better parameter name? Likewise, `row` 
should be `startRow`? Or no, the client doesn't know the region split points or 
the start row, only the data row they're seeking.
   
   Yeah, I think this method name and it's parameters could be improved for 
readability. Something like `int selectReplicaIdFor(TableName targetTable, 
byte[] targetStartRow)`. The correct name gets even more confusing/important 
when we have split meta in the works...

##########
File path: 
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
##########
@@ -338,4 +476,134 @@ static boolean contains(List<Result> contains, List<Cell> 
cells) throws IOExcept
     }
     return !containsScanner.advance() && matches >= 1 && count >= matches && 
count == cells.size();
   }
+
+  private void doNGets(final Table table, final byte[][] keys) throws 
Exception {

Review comment:
       nit: for helper methods who's purpose is to make assertions, I like to 
prefix their names with `assert`. So, `assertDoNGets`, 
`assertPrimaryNoChangeReplicaIncrease`, &c.

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
##########
@@ -444,6 +492,8 @@ private void locateInMeta(TableName tableName, 
LocateRequest req) {
 
       @Override
       public void onError(Throwable error) {
+        // TODO: if it fails, with meta replica load balance, it may try with 
another meta
+        // replica. This improvement will be done later.

Review comment:
       Do you have a Jira ID for this TODO?

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
##########
@@ -198,6 +208,36 @@ private boolean tryComplete(LocateRequest req, 
CompletableFuture<RegionLocations
       conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, 
DEFAULT_LOCATE_PREFETCH_LIMIT);
     this.useMetaReplicas =
       conn.getConfiguration().getBoolean(USE_META_REPLICAS, 
DEFAULT_USE_META_REPLICAS);
+
+    if (this.useMetaReplicas) {

Review comment:
       it's a nit-picky thing, but I like early escape better than 
deeply-nested if blocks because i think they're easier to read.

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaReplicaLoadBalanceReplicaChooser.java
##########
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * There are two modes with meta replica support.
+ *   HighAvailable    - Client sends requests to the primary meta region 
first, within a

Review comment:
       Isn't this concept already called "Hedged Read" in HDFS and HBase? Why 
give it this new name?

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaReplicaLoadBalanceReplicaSimpleChooser.java
##########
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ThreadLocalRandom;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_META_REPLICA_NUM;
+import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
+import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
+
+/**
+ * MetaReplicaLoadBalanceReplicaSimpleChooser implements a simple meta replica 
load balancing
+ * algorithm. It maintains a stale location cache for each table. Whenever 
client looks up meta,
+ * it first check if the row is the stale location cache, if yes, this means 
the the location from
+ * meta replica is stale, it will go to the primary meta to look up 
update-to-date location;
+ * otherwise, it will randomly pick up a meta replica region for meta lookup. 
When clients receive
+ * RegionNotServedException from region servers, it will add these region 
locations to the stale
+ * location cache. The stale cache will be cleaned up periodically by a chore.
+ */
+
+/**
+ * StaleLocationCacheEntry is the entry when a stale location is reported by 
an client.
+ */
+class StaleLocationCacheEntry {
+  // meta replica id where
+  private int metaReplicaId;
+
+  // timestamp in milliseconds
+  private long timestamp;
+
+  private byte[] endKey;
+
+  StaleLocationCacheEntry(final int metaReplicaId, final byte[] endKey) {
+    this.metaReplicaId = metaReplicaId;
+    this.endKey = endKey;
+    timestamp = System.currentTimeMillis();
+  }
+
+  public byte[] getEndKey() {
+    return this.endKey;
+  }
+
+  public int getMetaReplicaId() {
+    return this.metaReplicaId;
+  }
+  public long getTimestamp() {
+    return this.timestamp;
+  }
+
+  @Override
+  public String toString() {
+    return new ToStringBuilder(this)
+      .append("endKey", endKey)
+      .append("metaReplicaId", metaReplicaId)
+      .append("timestamp", timestamp)
+      .toString();
+  }
+}
+
+/**
+ * A simple implementation of MetaReplicaLoadBalanceReplicaChooser.
+ *
+ * It follows a simple algorithm to choose a meta replica to go:
+ *
+ *  1. If there is no stale location entry for rows it looks up, it will 
randomly
+ *     pick a meta replica region to do lookup.
+ *  2. If the location from meta replica region is stale, client gets 
RegionNotServedException
+ *     from region server, in this case, it will create 
StaleLocationCacheEntry in
+ *     MetaReplicaLoadBalanceReplicaSimpleChooser.
+ *  3. When client tries to do meta lookup, it checks StaleLocationCache first 
for rows it tries to
+ *     lookup, if entry exists, it will go with primary meta region to do 
lookup; otherwise, it
+ *     will follow step 1.
+ *  4. A chore will periodically run to clean up cache entries in the 
StaleLocationCache.
+ */
+class MetaReplicaLoadBalanceReplicaSimpleChooser implements 
MetaReplicaLoadBalanceReplicaChooser {
+  private static final Logger LOG =
+    LoggerFactory.getLogger(MetaReplicaLoadBalanceReplicaSimpleChooser.class);
+  private final long STALE_CACHE_TIMEOUT_IN_MILLISECONDS = 3000; // 3 seconds
+  private final int STALE_CACHE_CLEAN_CHORE_INTERVAL = 1500; // 1.5 seconds

Review comment:
       nit: include the unit "milliseconds" or "ms" in this field name as well.

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaReplicaLoadBalanceReplicaSimpleChooser.java
##########
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ThreadLocalRandom;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_META_REPLICA_NUM;
+import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
+import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
+
+/**
+ * MetaReplicaLoadBalanceReplicaSimpleChooser implements a simple meta replica 
load balancing
+ * algorithm. It maintains a stale location cache for each table. Whenever 
client looks up meta,
+ * it first check if the row is the stale location cache, if yes, this means 
the the location from
+ * meta replica is stale, it will go to the primary meta to look up 
update-to-date location;
+ * otherwise, it will randomly pick up a meta replica region for meta lookup. 
When clients receive
+ * RegionNotServedException from region servers, it will add these region 
locations to the stale
+ * location cache. The stale cache will be cleaned up periodically by a chore.
+ */
+
+/**
+ * StaleLocationCacheEntry is the entry when a stale location is reported by 
an client.
+ */
+class StaleLocationCacheEntry {
+  // meta replica id where
+  private int metaReplicaId;
+
+  // timestamp in milliseconds
+  private long timestamp;
+
+  private byte[] endKey;
+
+  StaleLocationCacheEntry(final int metaReplicaId, final byte[] endKey) {
+    this.metaReplicaId = metaReplicaId;
+    this.endKey = endKey;
+    timestamp = System.currentTimeMillis();

Review comment:
       nit, use EnvironmentEdge to retrieve a clock time.

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
##########
@@ -392,6 +406,16 @@ public Connection toConnection() {
     return c;
   }
 
+  private boolean isMetaReplicaLBMode() {
+    if (conf.getBoolean(USE_META_REPLICAS, DEFAULT_USE_META_REPLICAS)) {
+      String metaReplicaMode = conf.get(META_REPLICAS_MODE, "");
+      if (MetaReplicaMode.LoadBalance.toString().equals(metaReplicaMode)) {

Review comment:
       Is this configuration point intentionally case-sensitive?

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaReplicaLoadBalanceReplicaSimpleChooser.java
##########
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ThreadLocalRandom;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_META_REPLICA_NUM;
+import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
+import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
+
+/**
+ * MetaReplicaLoadBalanceReplicaSimpleChooser implements a simple meta replica 
load balancing
+ * algorithm. It maintains a stale location cache for each table. Whenever 
client looks up meta,
+ * it first check if the row is the stale location cache, if yes, this means 
the the location from
+ * meta replica is stale, it will go to the primary meta to look up 
update-to-date location;
+ * otherwise, it will randomly pick up a meta replica region for meta lookup. 
When clients receive
+ * RegionNotServedException from region servers, it will add these region 
locations to the stale
+ * location cache. The stale cache will be cleaned up periodically by a chore.
+ */
+
+/**
+ * StaleLocationCacheEntry is the entry when a stale location is reported by 
an client.
+ */
+class StaleLocationCacheEntry {
+  // meta replica id where
+  private int metaReplicaId;
+
+  // timestamp in milliseconds
+  private long timestamp;
+
+  private byte[] endKey;
+
+  StaleLocationCacheEntry(final int metaReplicaId, final byte[] endKey) {
+    this.metaReplicaId = metaReplicaId;
+    this.endKey = endKey;
+    timestamp = System.currentTimeMillis();
+  }
+
+  public byte[] getEndKey() {
+    return this.endKey;
+  }
+
+  public int getMetaReplicaId() {
+    return this.metaReplicaId;
+  }
+  public long getTimestamp() {
+    return this.timestamp;
+  }
+
+  @Override
+  public String toString() {
+    return new ToStringBuilder(this)

Review comment:
       Please use the `ShortPrefixToStringStyle` style.

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaReplicaLoadBalanceReplicaSimpleChooser.java
##########
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ThreadLocalRandom;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_META_REPLICA_NUM;
+import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
+import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
+
+/**
+ * MetaReplicaLoadBalanceReplicaSimpleChooser implements a simple meta replica 
load balancing
+ * algorithm. It maintains a stale location cache for each table. Whenever 
client looks up meta,
+ * it first check if the row is the stale location cache, if yes, this means 
the the location from
+ * meta replica is stale, it will go to the primary meta to look up 
update-to-date location;
+ * otherwise, it will randomly pick up a meta replica region for meta lookup. 
When clients receive
+ * RegionNotServedException from region servers, it will add these region 
locations to the stale
+ * location cache. The stale cache will be cleaned up periodically by a chore.
+ */
+
+/**
+ * StaleLocationCacheEntry is the entry when a stale location is reported by 
an client.
+ */
+class StaleLocationCacheEntry {
+  // meta replica id where
+  private int metaReplicaId;
+
+  // timestamp in milliseconds
+  private long timestamp;

Review comment:
       nit: can this be a `java.time.Instant`?

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
##########
@@ -577,6 +627,15 @@ private void removeLocationFromCache(HRegionLocation loc) {
       if (!canUpdateOnError(loc, oldLoc)) {
         return;
       }
+      // Tell metaReplicaChooser that the location is stale. It will create a 
stale entry
+      // with timestamp internally. Next time the client looks up the same 
location,
+      // it will pick a different meta replica region. For the current 
implementation,

Review comment:
       Using the default replica id here sound like it could be dangerous 
later. Does it make better sense to pass in `null` or `Optional.empty` ?
   
   Same question on a Jira ID for the TODO followup.

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
##########
@@ -198,6 +208,36 @@ private boolean tryComplete(LocateRequest req, 
CompletableFuture<RegionLocations
       conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, 
DEFAULT_LOCATE_PREFETCH_LIMIT);
     this.useMetaReplicas =
       conn.getConfiguration().getBoolean(USE_META_REPLICAS, 
DEFAULT_USE_META_REPLICAS);
+
+    if (this.useMetaReplicas) {
+      if (conn.getConfiguration()
+        .get(META_REPLICAS_MODE, 
"").equals(MetaReplicaMode.LoadBalance.toString())) {

Review comment:
       Another nit, but for readability, how about using local variables 
instead of digging into the configuration object, converting to strings, and 
comparison, all within the if condition?

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaReplicaLoadBalanceReplicaChooser.java
##########
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * There are two modes with meta replica support.
+ *   HighAvailable    - Client sends requests to the primary meta region 
first, within a
+ *                      configured amount of time, if  there is no response 
coming back,
+ *                      client sends requests to all replica regions and takes 
the first
+ *                      response.
+ *
+ *   LoadBalance      - Client sends requests to meta replica regions in a 
round-robin mode,
+ *                      if results from replica regions are stale, next time, 
client sends requests for
+ *                      these stable locations to the primary meta region. In 
this mode, scan
+ *                      requests are load balanced across all replica regions.
+ */
+enum MetaReplicaMode {
+  None,
+  HighAvailable,
+  LoadBalance
+}
+
+/**
+ * A Meta replica chooser decides which meta replica to go for scan requests.
+ */
+@InterfaceAudience.Private
+public interface MetaReplicaLoadBalanceReplicaChooser {
+
+  void updateCacheOnError(final HRegionLocation loc, final int 
fromMetaReplicaId);
+  int chooseReplicaToGo(final TableName tablename, final byte[] row,

Review comment:
       Interesting that this is Meta-specific but it accepts a `TableName` as a 
parameter; it seems redundant in the interface.

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaReplicaLoadBalanceReplicaSimpleChooser.java
##########
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ThreadLocalRandom;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_META_REPLICA_NUM;
+import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
+import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
+
+/**
+ * MetaReplicaLoadBalanceReplicaSimpleChooser implements a simple meta replica 
load balancing
+ * algorithm. It maintains a stale location cache for each table. Whenever 
client looks up meta,
+ * it first check if the row is the stale location cache, if yes, this means 
the the location from
+ * meta replica is stale, it will go to the primary meta to look up 
update-to-date location;
+ * otherwise, it will randomly pick up a meta replica region for meta lookup. 
When clients receive
+ * RegionNotServedException from region servers, it will add these region 
locations to the stale
+ * location cache. The stale cache will be cleaned up periodically by a chore.
+ */
+
+/**
+ * StaleLocationCacheEntry is the entry when a stale location is reported by 
an client.
+ */
+class StaleLocationCacheEntry {
+  // meta replica id where
+  private int metaReplicaId;
+
+  // timestamp in milliseconds
+  private long timestamp;
+
+  private byte[] endKey;
+
+  StaleLocationCacheEntry(final int metaReplicaId, final byte[] endKey) {
+    this.metaReplicaId = metaReplicaId;
+    this.endKey = endKey;
+    timestamp = System.currentTimeMillis();
+  }
+
+  public byte[] getEndKey() {
+    return this.endKey;
+  }
+
+  public int getMetaReplicaId() {
+    return this.metaReplicaId;
+  }
+  public long getTimestamp() {
+    return this.timestamp;
+  }
+
+  @Override
+  public String toString() {
+    return new ToStringBuilder(this)
+      .append("endKey", endKey)
+      .append("metaReplicaId", metaReplicaId)
+      .append("timestamp", timestamp)
+      .toString();
+  }
+}
+
+/**
+ * A simple implementation of MetaReplicaLoadBalanceReplicaChooser.
+ *
+ * It follows a simple algorithm to choose a meta replica to go:
+ *
+ *  1. If there is no stale location entry for rows it looks up, it will 
randomly
+ *     pick a meta replica region to do lookup.
+ *  2. If the location from meta replica region is stale, client gets 
RegionNotServedException
+ *     from region server, in this case, it will create 
StaleLocationCacheEntry in
+ *     MetaReplicaLoadBalanceReplicaSimpleChooser.
+ *  3. When client tries to do meta lookup, it checks StaleLocationCache first 
for rows it tries to
+ *     lookup, if entry exists, it will go with primary meta region to do 
lookup; otherwise, it
+ *     will follow step 1.
+ *  4. A chore will periodically run to clean up cache entries in the 
StaleLocationCache.
+ */
+class MetaReplicaLoadBalanceReplicaSimpleChooser implements 
MetaReplicaLoadBalanceReplicaChooser {
+  private static final Logger LOG =
+    LoggerFactory.getLogger(MetaReplicaLoadBalanceReplicaSimpleChooser.class);
+  private final long STALE_CACHE_TIMEOUT_IN_MILLISECONDS = 3000; // 3 seconds
+  private final int STALE_CACHE_CLEAN_CHORE_INTERVAL = 1500; // 1.5 seconds

Review comment:
       How did you come to these configuration values? Would a client ever want 
to customize them for their environment? 

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaReplicaLoadBalanceReplicaSimpleChooser.java
##########
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ThreadLocalRandom;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_META_REPLICA_NUM;
+import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
+import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
+
+/**
+ * MetaReplicaLoadBalanceReplicaSimpleChooser implements a simple meta replica 
load balancing
+ * algorithm. It maintains a stale location cache for each table. Whenever 
client looks up meta,
+ * it first check if the row is the stale location cache, if yes, this means 
the the location from
+ * meta replica is stale, it will go to the primary meta to look up 
update-to-date location;
+ * otherwise, it will randomly pick up a meta replica region for meta lookup. 
When clients receive
+ * RegionNotServedException from region servers, it will add these region 
locations to the stale
+ * location cache. The stale cache will be cleaned up periodically by a chore.
+ */
+
+/**
+ * StaleLocationCacheEntry is the entry when a stale location is reported by 
an client.
+ */
+class StaleLocationCacheEntry {
+  // meta replica id where
+  private int metaReplicaId;
+
+  // timestamp in milliseconds
+  private long timestamp;
+
+  private byte[] endKey;
+
+  StaleLocationCacheEntry(final int metaReplicaId, final byte[] endKey) {
+    this.metaReplicaId = metaReplicaId;
+    this.endKey = endKey;
+    timestamp = System.currentTimeMillis();
+  }
+
+  public byte[] getEndKey() {
+    return this.endKey;
+  }
+
+  public int getMetaReplicaId() {
+    return this.metaReplicaId;
+  }
+  public long getTimestamp() {
+    return this.timestamp;
+  }
+
+  @Override
+  public String toString() {
+    return new ToStringBuilder(this)
+      .append("endKey", endKey)
+      .append("metaReplicaId", metaReplicaId)
+      .append("timestamp", timestamp)
+      .toString();
+  }
+}

Review comment:
       This is a location cache entry, and just a simple POJO, which means it's 
probably used in a collection. Please also implement equals and hashCode 
methods. Commons provides helper classes for those (and IntelliJ can 
auto-generate them for you) as well.

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaReplicaLoadBalanceReplicaChooser.java
##########
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * There are two modes with meta replica support.
+ *   HighAvailable    - Client sends requests to the primary meta region 
first, within a
+ *                      configured amount of time, if  there is no response 
coming back,
+ *                      client sends requests to all replica regions and takes 
the first
+ *                      response.
+ *
+ *   LoadBalance      - Client sends requests to meta replica regions in a 
round-robin mode,

Review comment:
       If this implementation is explicitly round-robin, please call it 
"RoundRobin". There are other load-balancing strategies that we might 
implement, so this "LoadBalance" is too generic.

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
##########
@@ -198,6 +208,36 @@ private boolean tryComplete(LocateRequest req, 
CompletableFuture<RegionLocations
       conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, 
DEFAULT_LOCATE_PREFETCH_LIMIT);
     this.useMetaReplicas =
       conn.getConfiguration().getBoolean(USE_META_REPLICAS, 
DEFAULT_USE_META_REPLICAS);
+
+    if (this.useMetaReplicas) {
+      if (conn.getConfiguration()
+        .get(META_REPLICAS_MODE, 
"").equals(MetaReplicaMode.LoadBalance.toString())) {
+        this.metaReplicaMode = MetaReplicaMode.LoadBalance;
+        int numOfMetaReplicas = conn.getConfiguration().getInt(
+          META_REPLICAS_NUM, DEFAULT_META_REPLICA_NUM);
+        if (numOfMetaReplicas <= 1) {
+          LOG.error("Configured to support meta replica load balance mode,"
+            + " but there is no meta replica configured");
+          this.metaReplicaChooser = null;
+        } else {
+          String metaReplicaChooserImpl = conn.getConfiguration().get(

Review comment:
       Please move this ChooserImpl instantiation into a factory method.

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaReplicaLoadBalanceReplicaChooser.java
##########
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**

Review comment:
       Nice javadoc! Would you mind inserting some javadoc format markers so 
that this renders nicely in the IDE as well? (`<p>`, `<ul>`, &c.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to