This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new 46377c5d966 HBASE-28436 Use connection url to specify the connection 
registry information (#5770)
46377c5d966 is described below

commit 46377c5d96683a283c969a5d7c5ce5b12cfecedb
Author: Duo Zhang <zhang...@apache.org>
AuthorDate: Tue Apr 23 16:05:26 2024 +0800

    HBASE-28436 Use connection url to specify the connection registry 
information (#5770)
    
    Signed-off-by: Istvan Toth <st...@apache.org>
    Signed-off-by: Nick Dimiduk <ndimi...@apache.org>
    Reviewed-by: Bryan Beaudreault <bbeaudrea...@apache.org>
    (cherry picked from commit e3761baec1158d617c46bbdf54725206544717e9)
---
 hbase-client/pom.xml                               |   5 +
 .../hadoop/hbase/client/ConnectionFactory.java     | 313 +++++++++++++++++----
 .../hbase/client/ConnectionRegistryFactory.java    |  64 ++++-
 ...tory.java => ConnectionRegistryURIFactory.java} |  27 +-
 ...tory.java => RpcConnectionRegistryCreator.java} |  30 +-
 ...ctory.java => ZKConnectionRegistryCreator.java} |  33 ++-
 ...adoop.hbase.client.ConnectionRegistryURIFactory |  17 ++
 .../TestConnectionRegistryCreatorUriParsing.java   | 157 +++++++++++
 .../hbase/client/ClusterConnectionFactory.java     |   2 +-
 .../hbase/client/AbstractTestRegionLocator.java    |   2 +-
 .../client/TestAsyncAdminWithRegionReplicas.java   |   2 +-
 .../hbase/client/TestAsyncMetaRegionLocator.java   |   3 +-
 .../client/TestAsyncNonMetaRegionLocator.java      |   2 +-
 ...stAsyncNonMetaRegionLocatorConcurrenyLimit.java |   2 +-
 .../hbase/client/TestAsyncRegionLocator.java       |   2 +-
 .../TestAsyncSingleRequestRpcRetryingCaller.java   |   2 +-
 .../client/TestAsyncTableUseMetaReplicas.java      |   3 +-
 ...ReadWriteWithDifferentConnectionRegistries.java | 177 ++++++++++++
 ...estCatalogReplicaLoadBalanceSimpleSelector.java |   3 +-
 .../hbase/client/TestMetaRegionLocationCache.java  |   3 +-
 20 files changed, 737 insertions(+), 112 deletions(-)

diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml
index dba79ff07ee..8da4ab8e837 100644
--- a/hbase-client/pom.xml
+++ b/hbase-client/pom.xml
@@ -170,6 +170,11 @@
       <artifactId>mockito-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-inline</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.hamcrest</groupId>
       <artifactId>hamcrest-library</artifactId>
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
index 716fb4863fe..f4ef4496dfc 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
@@ -21,6 +21,7 @@ import static 
org.apache.hadoop.hbase.util.FutureUtils.addListener;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
+import java.net.URI;
 import java.security.PrivilegedExceptionAction;
 import java.util.Collections;
 import java.util.Map;
@@ -89,41 +90,55 @@ public class ConnectionFactory {
    * instance. Typical usage:
    *
    * <pre>
-   * Connection connection = ConnectionFactory.createConnection();
-   * Table table = connection.getTable(TableName.valueOf("mytable"));
-   * try {
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
    *   table.get(...);
    *   ...
-   * } finally {
-   *   table.close();
-   *   connection.close();
    * }
    * </pre>
    *
    * @return Connection object for <code>conf</code>
    */
   public static Connection createConnection() throws IOException {
-    Configuration conf = HBaseConfiguration.create();
-    return createConnection(conf, null, AuthUtil.loginClient(conf));
+    return createConnection(HBaseConfiguration.create());
+  }
+
+  /**
+   * Create a new Connection instance using default HBaseConfiguration. 
Connection encapsulates all
+   * housekeeping for a connection to the cluster. All tables and interfaces 
created from returned
+   * connection share zookeeper connection, meta cache, and connections to 
region servers and
+   * masters. <br>
+   * The caller is responsible for calling {@link Connection#close()} on the 
returned connection
+   * instance. Typical usage:
+   *
+   * <pre>
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
+   *   table.get(...);
+   *   ...
+   * }
+   * </pre>
+   *
+   * @param connectionUri the connection uri for the hbase cluster
+   * @return Connection object for <code>conf</code>
+   */
+  public static Connection createConnection(URI connectionUri) throws 
IOException {
+    return createConnection(connectionUri, HBaseConfiguration.create());
   }
 
   /**
    * Create a new Connection instance using the passed <code>conf</code> 
instance. Connection
    * encapsulates all housekeeping for a connection to the cluster. All tables 
and interfaces
-   * created from returned connection share zookeeper connection, meta cache, 
and connections to
-   * region servers and masters. <br>
+   * created from returned connection share zookeeper connection(if used), 
meta cache, and
+   * connections to region servers and masters. <br>
    * The caller is responsible for calling {@link Connection#close()} on the 
returned connection
    * instance. Typical usage:
    *
    * <pre>
-   * Connection connection = ConnectionFactory.createConnection(conf);
-   * Table table = connection.getTable(TableName.valueOf("mytable"));
-   * try {
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
    *   table.get(...);
    *   ...
-   * } finally {
-   *   table.close();
-   *   connection.close();
    * }
    * </pre>
    *
@@ -137,20 +152,41 @@ public class ConnectionFactory {
   /**
    * Create a new Connection instance using the passed <code>conf</code> 
instance. Connection
    * encapsulates all housekeeping for a connection to the cluster. All tables 
and interfaces
-   * created from returned connection share zookeeper connection, meta cache, 
and connections to
-   * region servers and masters. <br>
+   * created from returned connection share zookeeper connection(if used), 
meta cache, and
+   * connections to region servers and masters. <br>
    * The caller is responsible for calling {@link Connection#close()} on the 
returned connection
    * instance. Typical usage:
    *
    * <pre>
-   * Connection connection = ConnectionFactory.createConnection(conf);
-   * Table table = connection.getTable(TableName.valueOf("mytable"));
-   * try {
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
+   *   table.get(...);
+   *   ...
+   * }
+   * </pre>
+   *
+   * @param connectionUri the connection uri for the hbase cluster
+   * @param conf          configuration
+   * @return Connection object for <code>conf</code>
+   */
+  public static Connection createConnection(URI connectionUri, Configuration 
conf)
+    throws IOException {
+    return createConnection(connectionUri, conf, null, 
AuthUtil.loginClient(conf));
+  }
+
+  /**
+   * Create a new Connection instance using the passed <code>conf</code> 
instance. Connection
+   * encapsulates all housekeeping for a connection to the cluster. All tables 
and interfaces
+   * created from returned connection share zookeeper connection(if used), 
meta cache, and
+   * connections to region servers and masters. <br>
+   * The caller is responsible for calling {@link Connection#close()} on the 
returned connection
+   * instance. Typical usage:
+   *
+   * <pre>
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
    *   table.get(...);
    *   ...
-   * } finally {
-   *   table.close();
-   *   connection.close();
    * }
    * </pre>
    *
@@ -166,20 +202,42 @@ public class ConnectionFactory {
   /**
    * Create a new Connection instance using the passed <code>conf</code> 
instance. Connection
    * encapsulates all housekeeping for a connection to the cluster. All tables 
and interfaces
-   * created from returned connection share zookeeper connection, meta cache, 
and connections to
-   * region servers and masters. <br>
+   * created from returned connection share zookeeper connection(if used), 
meta cache, and
+   * connections to region servers and masters. <br>
    * The caller is responsible for calling {@link Connection#close()} on the 
returned connection
    * instance. Typical usage:
    *
    * <pre>
-   * Connection connection = ConnectionFactory.createConnection(conf);
-   * Table table = connection.getTable(TableName.valueOf("table1"));
-   * try {
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
+   *   table.get(...);
+   *   ...
+   * }
+   * </pre>
+   *
+   * @param connectionUri the connection uri for the hbase cluster
+   * @param conf          configuration
+   * @param pool          the thread pool to use for batch operations
+   * @return Connection object for <code>conf</code>
+   */
+  public static Connection createConnection(URI connectionUri, Configuration 
conf,
+    ExecutorService pool) throws IOException {
+    return createConnection(connectionUri, conf, pool, 
AuthUtil.loginClient(conf));
+  }
+
+  /**
+   * Create a new Connection instance using the passed <code>conf</code> 
instance. Connection
+   * encapsulates all housekeeping for a connection to the cluster. All tables 
and interfaces
+   * created from returned connection share zookeeper connection(if used), 
meta cache, and
+   * connections to region servers and masters. <br>
+   * The caller is responsible for calling {@link Connection#close()} on the 
returned connection
+   * instance. Typical usage:
+   *
+   * <pre>
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
    *   table.get(...);
    *   ...
-   * } finally {
-   *   table.close();
-   *   connection.close();
    * }
    * </pre>
    *
@@ -194,20 +252,42 @@ public class ConnectionFactory {
   /**
    * Create a new Connection instance using the passed <code>conf</code> 
instance. Connection
    * encapsulates all housekeeping for a connection to the cluster. All tables 
and interfaces
-   * created from returned connection share zookeeper connection, meta cache, 
and connections to
-   * region servers and masters. <br>
+   * created from returned connection share zookeeper connection(if used), 
meta cache, and
+   * connections to region servers and masters. <br>
    * The caller is responsible for calling {@link Connection#close()} on the 
returned connection
    * instance. Typical usage:
    *
    * <pre>
-   * Connection connection = ConnectionFactory.createConnection(conf);
-   * Table table = connection.getTable(TableName.valueOf("table1"));
-   * try {
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
+   *   table.get(...);
+   *   ...
+   * }
+   * </pre>
+   *
+   * @param connectionUri the connection uri for the hbase cluster
+   * @param conf          configuration
+   * @param user          the user the connection is for
+   * @return Connection object for <code>conf</code>
+   */
+  public static Connection createConnection(URI connectionUri, Configuration 
conf, User user)
+    throws IOException {
+    return createConnection(connectionUri, conf, null, user);
+  }
+
+  /**
+   * Create a new Connection instance using the passed <code>conf</code> 
instance. Connection
+   * encapsulates all housekeeping for a connection to the cluster. All tables 
and interfaces
+   * created from returned connection share zookeeper connection(if used), 
meta cache, and
+   * connections to region servers and masters. <br>
+   * The caller is responsible for calling {@link Connection#close()} on the 
returned connection
+   * instance. Typical usage:
+   *
+   * <pre>
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
    *   table.get(...);
    *   ...
-   * } finally {
-   *   table.close();
-   *   connection.close();
    * }
    * </pre>
    *
@@ -224,20 +304,43 @@ public class ConnectionFactory {
   /**
    * Create a new Connection instance using the passed <code>conf</code> 
instance. Connection
    * encapsulates all housekeeping for a connection to the cluster. All tables 
and interfaces
-   * created from returned connection share zookeeper connection, meta cache, 
and connections to
-   * region servers and masters. <br>
+   * created from returned connection share zookeeper connection(if used), 
meta cache, and
+   * connections to region servers and masters. <br>
    * The caller is responsible for calling {@link Connection#close()} on the 
returned connection
    * instance. Typical usage:
    *
    * <pre>
-   * Connection connection = ConnectionFactory.createConnection(conf);
-   * Table table = connection.getTable(TableName.valueOf("table1"));
-   * try {
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
+   *   table.get(...);
+   *   ...
+   * }
+   * </pre>
+   *
+   * @param connectionUri the connection uri for the hbase cluster
+   * @param conf          configuration
+   * @param user          the user the connection is for
+   * @param pool          the thread pool to use for batch operations
+   * @return Connection object for <code>conf</code>
+   */
+  public static Connection createConnection(URI connectionUri, Configuration 
conf,
+    ExecutorService pool, User user) throws IOException {
+    return createConnection(connectionUri, conf, pool, user, 
Collections.emptyMap());
+  }
+
+  /**
+   * Create a new Connection instance using the passed <code>conf</code> 
instance. Connection
+   * encapsulates all housekeeping for a connection to the cluster. All tables 
and interfaces
+   * created from returned connection share zookeeper connection(if used), 
meta cache, and
+   * connections to region servers and masters. <br>
+   * The caller is responsible for calling {@link Connection#close()} on the 
returned connection
+   * instance. Typical usage:
+   *
+   * <pre>
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
    *   table.get(...);
    *   ...
-   * } finally {
-   *   table.close();
-   *   connection.close();
    * }
    * </pre>
    *
@@ -249,6 +352,37 @@ public class ConnectionFactory {
    */
   public static Connection createConnection(Configuration conf, 
ExecutorService pool,
     final User user, Map<String, byte[]> connectionAttributes) throws 
IOException {
+    return createConnection(null, conf, pool, user, connectionAttributes);
+  }
+
+  /**
+   * Create a new Connection instance using the passed <code>conf</code> 
instance. Connection
+   * encapsulates all housekeeping for a connection to the cluster. All tables 
and interfaces
+   * created from returned connection share zookeeper connection(if used), 
meta cache, and
+   * connections to region servers and masters. <br>
+   * The caller is responsible for calling {@link Connection#close()} on the 
returned connection
+   * instance. Typical usage:
+   *
+   * <pre>
+   * Connection connection = ConnectionFactory.createConnection(conf);
+   * Table table = connection.getTable(TableName.valueOf("table1"));
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
+   *   table.get(...);
+   *   ...
+   * }
+   * </pre>
+   *
+   * @param connectionUri        the connection uri for the hbase cluster
+   * @param conf                 configuration
+   * @param user                 the user the connection is for
+   * @param pool                 the thread pool to use for batch operations
+   * @param connectionAttributes attributes to be sent along to server during 
connection establish
+   * @return Connection object for <code>conf</code>
+   */
+  public static Connection createConnection(URI connectionUri, Configuration 
conf,
+    ExecutorService pool, final User user, Map<String, byte[]> 
connectionAttributes)
+    throws IOException {
     Class<?> clazz = 
conf.getClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
       ConnectionOverAsyncConnection.class, Connection.class);
     if (clazz != ConnectionOverAsyncConnection.class) {
@@ -263,7 +397,7 @@ public class ConnectionFactory {
         throw new IOException(e);
       }
     } else {
-      return FutureUtils.get(createAsyncConnection(conf, user, 
connectionAttributes))
+      return FutureUtils.get(createAsyncConnection(connectionUri, conf, user, 
connectionAttributes))
         .toConnection();
     }
   }
@@ -277,6 +411,16 @@ public class ConnectionFactory {
     return createAsyncConnection(HBaseConfiguration.create());
   }
 
+  /**
+   * Call {@link #createAsyncConnection(URI, Configuration)} using default 
HBaseConfiguration.
+   * @param connectionUri the connection uri for the hbase cluster
+   * @see #createAsyncConnection(URI, Configuration)
+   * @return AsyncConnection object wrapped by CompletableFuture
+   */
+  public static CompletableFuture<AsyncConnection> createAsyncConnection(URI 
connectionUri) {
+    return createAsyncConnection(connectionUri, HBaseConfiguration.create());
+  }
+
   /**
    * Call {@link #createAsyncConnection(Configuration, User)} using the given 
{@code conf} and a
    * User object created by {@link UserProvider}. The given {@code conf} will 
also be used to
@@ -287,6 +431,21 @@ public class ConnectionFactory {
    * @see UserProvider
    */
   public static CompletableFuture<AsyncConnection> 
createAsyncConnection(Configuration conf) {
+    return createAsyncConnection(null, conf);
+  }
+
+  /**
+   * Call {@link #createAsyncConnection(Configuration, User)} using the given 
{@code connectionUri},
+   * {@code conf} and a User object created by {@link UserProvider}. The given 
{@code conf} will
+   * also be used to initialize the {@link UserProvider}.
+   * @param connectionUri the connection uri for the hbase cluster
+   * @param conf          configuration
+   * @return AsyncConnection object wrapped by CompletableFuture
+   * @see #createAsyncConnection(Configuration, User)
+   * @see UserProvider
+   */
+  public static CompletableFuture<AsyncConnection> createAsyncConnection(URI 
connectionUri,
+    Configuration conf) {
     User user;
     try {
       user = AuthUtil.loginClient(conf);
@@ -295,7 +454,7 @@ public class ConnectionFactory {
       future.completeExceptionally(e);
       return future;
     }
-    return createAsyncConnection(conf, user);
+    return createAsyncConnection(connectionUri, conf, user);
   }
 
   /**
@@ -315,7 +474,28 @@ public class ConnectionFactory {
    */
   public static CompletableFuture<AsyncConnection> 
createAsyncConnection(Configuration conf,
     final User user) {
-    return createAsyncConnection(conf, user, null);
+    return createAsyncConnection(null, conf, user);
+  }
+
+  /**
+   * Create a new AsyncConnection instance using the passed {@code 
connectionUri}, {@code conf} and
+   * {@code user}. AsyncConnection encapsulates all housekeeping for a 
connection to the cluster.
+   * All tables and interfaces created from returned connection share 
zookeeper connection(if used),
+   * meta cache, and connections to region servers and masters.
+   * <p>
+   * The caller is responsible for calling {@link AsyncConnection#close()} on 
the returned
+   * connection instance.
+   * <p>
+   * Usually you should only create one AsyncConnection instance in your code 
and use it everywhere
+   * as it is thread safe.
+   * @param connectionUri the connection uri for the hbase cluster
+   * @param conf          configuration
+   * @param user          the user the asynchronous connection is for
+   * @return AsyncConnection object wrapped by CompletableFuture
+   */
+  public static CompletableFuture<AsyncConnection> createAsyncConnection(URI 
connectionUri,
+    Configuration conf, final User user) {
+    return createAsyncConnection(connectionUri, conf, user, null);
   }
 
   /**
@@ -336,9 +516,38 @@ public class ConnectionFactory {
    */
   public static CompletableFuture<AsyncConnection> 
createAsyncConnection(Configuration conf,
     final User user, Map<String, byte[]> connectionAttributes) {
+    return createAsyncConnection(null, conf, user, connectionAttributes);
+  }
+
+  /**
+   * Create a new AsyncConnection instance using the passed {@code 
connectionUri}, {@code conf} and
+   * {@code user}. AsyncConnection encapsulates all housekeeping for a 
connection to the cluster.
+   * All tables and interfaces created from returned connection share 
zookeeper connection(if used),
+   * meta cache, and connections to region servers and masters.
+   * <p>
+   * The caller is responsible for calling {@link AsyncConnection#close()} on 
the returned
+   * connection instance.
+   * <p>
+   * Usually you should only create one AsyncConnection instance in your code 
and use it everywhere
+   * as it is thread safe.
+   * @param connectionUri        the connection uri for the hbase cluster
+   * @param conf                 configuration
+   * @param user                 the user the asynchronous connection is for
+   * @param connectionAttributes attributes to be sent along to server during 
connection establish
+   * @return AsyncConnection object wrapped by CompletableFuture
+   */
+  public static CompletableFuture<AsyncConnection> createAsyncConnection(URI 
connectionUri,
+    Configuration conf, final User user, Map<String, byte[]> 
connectionAttributes) {
     return TraceUtil.tracedFuture(() -> {
+      ConnectionRegistry registry;
+      try {
+        registry = connectionUri != null
+          ? ConnectionRegistryFactory.create(connectionUri, conf, user)
+          : ConnectionRegistryFactory.create(conf, user);
+      } catch (Exception e) {
+        return FutureUtils.failedFuture(e);
+      }
       CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
-      ConnectionRegistry registry = 
ConnectionRegistryFactory.getRegistry(conf, user);
       addListener(registry.getClusterId(), (clusterId, error) -> {
         if (error != null) {
           registry.close();
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
index 415d46397b8..5eef2c5f93e 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
@@ -17,27 +17,77 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static 
org.apache.hadoop.hbase.HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY;
-
+import java.io.IOException;
+import java.net.URI;
+import java.util.ServiceLoader;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
 
 /**
- * Factory class to get the instance of configured connection registry.
+ * The entry point for creating a {@link ConnectionRegistry}.
  */
 @InterfaceAudience.Private
 final class ConnectionRegistryFactory {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(ConnectionRegistryFactory.class);
+
+  private static final ImmutableMap<String, ConnectionRegistryURIFactory> 
CREATORS;
+  static {
+    ImmutableMap.Builder<String, ConnectionRegistryURIFactory> builder = 
ImmutableMap.builder();
+    for (ConnectionRegistryURIFactory factory : ServiceLoader
+      .load(ConnectionRegistryURIFactory.class)) {
+      builder.put(factory.getScheme().toLowerCase(), factory);
+    }
+    // throw IllegalArgumentException if there are duplicated keys
+    CREATORS = builder.buildOrThrow();
+  }
+
   private ConnectionRegistryFactory() {
   }
 
-  /** Returns The connection registry implementation to use. */
-  static ConnectionRegistry getRegistry(Configuration conf, User user) {
+  /**
+   * Returns the connection registry implementation to use, for the given 
connection url
+   * {@code uri}.
+   * <p/>
+   * We use {@link ServiceLoader} to load different implementations, and use 
the scheme of the given
+   * {@code uri} to select. And if there is no protocol specified, or we can 
not find a
+   * {@link ConnectionRegistryURIFactory} implementation for the given scheme, 
we will fallback to
+   * use the old way to create the {@link ConnectionRegistry}. Notice that, if 
fallback happens, the
+   * specified connection url {@code uri} will not take effect, we will load 
all the related
+   * configurations from the given Configuration instance {@code conf}
+   */
+  static ConnectionRegistry create(URI uri, Configuration conf, User user) 
throws IOException {
+    if (StringUtils.isBlank(uri.getScheme())) {
+      LOG.warn("No scheme specified for {}, fallback to use old way", uri);
+      return create(conf, user);
+    }
+    ConnectionRegistryURIFactory creator = 
CREATORS.get(uri.getScheme().toLowerCase());
+    if (creator == null) {
+      LOG.warn("No creator registered for {}, fallback to use old way", uri);
+      return create(conf, user);
+    }
+    return creator.create(uri, conf, user);
+  }
+
+  /**
+   * Returns the connection registry implementation to use.
+   * <p/>
+   * This is used when we do not have a connection url, we will use the old 
way to load the
+   * connection registry, by checking the
+   * {@literal HConstants#CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY} 
configuration.
+   */
+  static ConnectionRegistry create(Configuration conf, User user) {
     Class<? extends ConnectionRegistry> clazz =
-      conf.getClass(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 
RpcConnectionRegistry.class,
-        ConnectionRegistry.class);
+      conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
+        RpcConnectionRegistry.class, ConnectionRegistry.class);
     return ReflectionUtils.newInstance(clazz, conf, user);
   }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryURIFactory.java
similarity index 60%
copy from 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
copy to 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryURIFactory.java
index 415d46397b8..ab2037a1c13 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryURIFactory.java
@@ -17,27 +17,26 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static 
org.apache.hadoop.hbase.HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY;
-
+import java.io.IOException;
+import java.net.URI;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
- * Factory class to get the instance of configured connection registry.
+ * For creating different {@link ConnectionRegistry} implementation.
  */
 @InterfaceAudience.Private
-final class ConnectionRegistryFactory {
+public interface ConnectionRegistryURIFactory {
 
-  private ConnectionRegistryFactory() {
-  }
+  /**
+   * Instantiate the {@link ConnectionRegistry} using the given parameters.
+   */
+  ConnectionRegistry create(URI uri, Configuration conf, User user) throws 
IOException;
 
-  /** Returns The connection registry implementation to use. */
-  static ConnectionRegistry getRegistry(Configuration conf, User user) {
-    Class<? extends ConnectionRegistry> clazz =
-      conf.getClass(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 
RpcConnectionRegistry.class,
-        ConnectionRegistry.class);
-    return ReflectionUtils.newInstance(clazz, conf, user);
-  }
+  /**
+   * The scheme for this implementation. Used to register this URI factory to 
the
+   * {@link ConnectionRegistryFactory}.
+   */
+  String getScheme();
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistryCreator.java
similarity index 54%
copy from 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
copy to 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistryCreator.java
index 415d46397b8..cb2338b1429 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistryCreator.java
@@ -17,27 +17,33 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static 
org.apache.hadoop.hbase.HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY;
-
+import java.io.IOException;
+import java.net.URI;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * Factory class to get the instance of configured connection registry.
+ * Connection registry creator implementation for creating {@link 
RpcConnectionRegistry}.
  */
 @InterfaceAudience.Private
-final class ConnectionRegistryFactory {
+public class RpcConnectionRegistryCreator implements 
ConnectionRegistryURIFactory {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RpcConnectionRegistryCreator.class);
 
-  private ConnectionRegistryFactory() {
+  @Override
+  public ConnectionRegistry create(URI uri, Configuration conf, User user) 
throws IOException {
+    assert getScheme().equals(uri.getScheme());
+    LOG.debug("connect to hbase cluster with rpc bootstrap servers='{}'", 
uri.getAuthority());
+    Configuration c = new Configuration(conf);
+    c.set(RpcConnectionRegistry.BOOTSTRAP_NODES, uri.getAuthority());
+    return new RpcConnectionRegistry(c, user);
   }
 
-  /** Returns The connection registry implementation to use. */
-  static ConnectionRegistry getRegistry(Configuration conf, User user) {
-    Class<? extends ConnectionRegistry> clazz =
-      conf.getClass(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 
RpcConnectionRegistry.class,
-        ConnectionRegistry.class);
-    return ReflectionUtils.newInstance(clazz, conf, user);
+  @Override
+  public String getScheme() {
+    return "hbase+rpc";
   }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistryCreator.java
similarity index 51%
copy from 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
copy to 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistryCreator.java
index 415d46397b8..8aa51e04fe4 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistryCreator.java
@@ -17,27 +17,36 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static 
org.apache.hadoop.hbase.HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY;
-
+import java.io.IOException;
+import java.net.URI;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * Factory class to get the instance of configured connection registry.
+ * Connection registry creator implementation for creating {@link 
ZKConnectionRegistry}.
  */
 @InterfaceAudience.Private
-final class ConnectionRegistryFactory {
+public class ZKConnectionRegistryCreator implements 
ConnectionRegistryURIFactory {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ZKConnectionRegistryCreator.class);
 
-  private ConnectionRegistryFactory() {
+  @Override
+  public ConnectionRegistry create(URI uri, Configuration conf, User user) 
throws IOException {
+    assert getScheme().equals(uri.getScheme());
+    LOG.debug("connect to hbase cluster with zk quorum='{}' and parent='{}'", 
uri.getAuthority(),
+      uri.getPath());
+    Configuration c = new Configuration(conf);
+    c.set(HConstants.CLIENT_ZOOKEEPER_QUORUM, uri.getAuthority());
+    c.set(HConstants.ZOOKEEPER_ZNODE_PARENT, uri.getPath());
+    return new ZKConnectionRegistry(c, user);
   }
 
-  /** Returns The connection registry implementation to use. */
-  static ConnectionRegistry getRegistry(Configuration conf, User user) {
-    Class<? extends ConnectionRegistry> clazz =
-      conf.getClass(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 
RpcConnectionRegistry.class,
-        ConnectionRegistry.class);
-    return ReflectionUtils.newInstance(clazz, conf, user);
+  @Override
+  public String getScheme() {
+    return "hbase+zk";
   }
 }
diff --git 
a/hbase-client/src/main/resources/META-INF/services/org.apache.hadoop.hbase.client.ConnectionRegistryURIFactory
 
b/hbase-client/src/main/resources/META-INF/services/org.apache.hadoop.hbase.client.ConnectionRegistryURIFactory
new file mode 100644
index 00000000000..b25a569776f
--- /dev/null
+++ 
b/hbase-client/src/main/resources/META-INF/services/org.apache.hadoop.hbase.client.ConnectionRegistryURIFactory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.hadoop.hbase.client.RpcConnectionRegistryCreator
+org.apache.hadoop.hbase.client.ZKConnectionRegistryCreator
\ No newline at end of file
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryCreatorUriParsing.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryCreatorUriParsing.java
new file mode 100644
index 00000000000..4dabd894b5b
--- /dev/null
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryCreatorUriParsing.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.mockStatic;
+
+import java.net.URI;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.ArgumentCaptor;
+import org.mockito.MockedConstruction;
+import org.mockito.MockedStatic;
+
+/**
+ * Make sure we can successfully parse the URI component
+ */
+@Category({ ClientTests.class, SmallTests.class })
+public class TestConnectionRegistryCreatorUriParsing {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestConnectionRegistryCreatorUriParsing.class);
+
+  private Configuration conf;
+
+  private User user;
+
+  private MockedConstruction<RpcConnectionRegistry> mockedRpcRegistry;
+
+  private MockedConstruction<ZKConnectionRegistry> mockedZkRegistry;
+
+  private MockedStatic<ReflectionUtils> mockedReflectionUtils;
+
+  private List<?> args;
+
+  @Before
+  public void setUp() {
+    conf = HBaseConfiguration.create();
+    user = mock(User.class);
+    args = null;
+    mockedRpcRegistry = mockConstruction(RpcConnectionRegistry.class, (mock, 
context) -> {
+      args = context.arguments();
+    });
+    mockedZkRegistry = mockConstruction(ZKConnectionRegistry.class, (mock, 
context) -> {
+      args = context.arguments();
+    });
+    mockedReflectionUtils = mockStatic(ReflectionUtils.class);
+  }
+
+  @After
+  public void tearDown() {
+    mockedRpcRegistry.closeOnDemand();
+    mockedZkRegistry.closeOnDemand();
+    mockedReflectionUtils.closeOnDemand();
+  }
+
+  @Test
+  public void testParseRpcSingle() throws Exception {
+    ConnectionRegistryFactory.create(new URI("hbase+rpc://server1:123"), conf, 
user);
+    assertEquals(1, mockedRpcRegistry.constructed().size());
+    assertSame(user, args.get(1));
+    Configuration conf = (Configuration) args.get(0);
+    assertEquals("server1:123", 
conf.get(RpcConnectionRegistry.BOOTSTRAP_NODES));
+  }
+
+  @Test
+  public void testParseRpcMultiple() throws Exception {
+    ConnectionRegistryFactory.create(new 
URI("hbase+rpc://server1:123,server2:456,server3:789"),
+      conf, user);
+    assertEquals(1, mockedRpcRegistry.constructed().size());
+    assertSame(user, args.get(1));
+    Configuration conf = (Configuration) args.get(0);
+    assertEquals("server1:123,server2:456,server3:789",
+      conf.get(RpcConnectionRegistry.BOOTSTRAP_NODES));
+  }
+
+  @Test
+  public void testParseZkSingle() throws Exception {
+    ConnectionRegistryFactory.create(new URI("hbase+zk://server1:123/root"), 
conf, user);
+    assertEquals(1, mockedZkRegistry.constructed().size());
+    assertSame(user, args.get(1));
+    Configuration conf = (Configuration) args.get(0);
+    assertEquals("server1:123", conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM));
+    assertEquals("/root", conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+  }
+
+  @Test
+  public void testParseZkMultiple() throws Exception {
+    ConnectionRegistryFactory
+      .create(new 
URI("hbase+zk://server1:123,server2:456,server3:789/root/path"), conf, user);
+    assertEquals(1, mockedZkRegistry.constructed().size());
+    assertSame(user, args.get(1));
+    Configuration conf = (Configuration) args.get(0);
+    assertEquals("server1:123,server2:456,server3:789",
+      conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM));
+    assertEquals("/root/path", conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+  }
+
+  @Test
+  public void testFallbackNoScheme() throws Exception {
+    conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 
ZKConnectionRegistry.class,
+      ConnectionRegistry.class);
+    ConnectionRegistryFactory.create(new URI("server1:2181/path"), conf, user);
+    ArgumentCaptor<Class<?>> clazzCaptor = 
ArgumentCaptor.forClass(Class.class);
+    ArgumentCaptor<Object[]> argsCaptor = 
ArgumentCaptor.forClass(Object[].class);
+    mockedReflectionUtils
+      .verify(() -> ReflectionUtils.newInstance(clazzCaptor.capture(), 
argsCaptor.capture()));
+    assertEquals(ZKConnectionRegistry.class, clazzCaptor.getValue());
+    assertSame(conf, argsCaptor.getValue()[0]);
+    assertSame(user, argsCaptor.getValue()[1]);
+  }
+
+  @Test
+  public void testFallbackNoCreator() throws Exception {
+    conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 
RpcConnectionRegistry.class,
+      ConnectionRegistry.class);
+    ConnectionRegistryFactory.create(new URI("hbase+tls://server1:123/path"), 
conf, user);
+    ArgumentCaptor<Class<?>> clazzCaptor = 
ArgumentCaptor.forClass(Class.class);
+    ArgumentCaptor<Object[]> argsCaptor = 
ArgumentCaptor.forClass(Object[].class);
+    mockedReflectionUtils
+      .verify(() -> ReflectionUtils.newInstance(clazzCaptor.capture(), 
argsCaptor.capture()));
+    assertEquals(RpcConnectionRegistry.class, clazzCaptor.getValue());
+    assertSame(conf, argsCaptor.getValue()[0]);
+    assertSame(user, argsCaptor.getValue()[1]);
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
index 7225f92b7ff..ed90863763a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
@@ -64,7 +64,7 @@ public final class ClusterConnectionFactory {
    */
   public static AsyncClusterConnection 
createAsyncClusterConnection(Configuration conf,
     SocketAddress localAddress, User user) throws IOException {
-    return createAsyncClusterConnection(conf, 
ConnectionRegistryFactory.getRegistry(conf, user),
+    return createAsyncClusterConnection(conf, 
ConnectionRegistryFactory.create(conf, user),
       localAddress, user);
   }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java
index 0ff105743e0..031dff736c8 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java
@@ -60,7 +60,7 @@ public abstract class AbstractTestRegionLocator {
     UTIL.getAdmin().createTable(td, SPLIT_KEYS);
     UTIL.waitTableAvailable(TABLE_NAME);
     try (ConnectionRegistry registry =
-      ConnectionRegistryFactory.getRegistry(UTIL.getConfiguration(), 
User.getCurrent())) {
+      ConnectionRegistryFactory.create(UTIL.getConfiguration(), 
User.getCurrent())) {
       RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL, registry);
     }
     UTIL.getAdmin().balancerSwitch(false, true);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java
index da400f29c0c..bb0eb31d254 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java
@@ -56,7 +56,7 @@ public class TestAsyncAdminWithRegionReplicas extends 
TestAsyncAdminBase {
     TestAsyncAdminBase.setUpBeforeClass();
     HBaseTestingUtil.setReplicas(TEST_UTIL.getAdmin(), 
TableName.META_TABLE_NAME, 3);
     try (ConnectionRegistry registry =
-      ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), 
User.getCurrent())) {
+      ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), 
User.getCurrent())) {
       RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, 
registry);
     }
   }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
index 90d2cb51e8c..e14cd32a388 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java
@@ -107,8 +107,7 @@ public class TestAsyncMetaRegionLocator {
       testUtil = miniClusterRule.getTestingUtility();
       HBaseTestingUtil.setReplicas(admin, TableName.META_TABLE_NAME, 3);
       testUtil.waitUntilNoRegionsInTransition();
-      registry =
-        ConnectionRegistryFactory.getRegistry(testUtil.getConfiguration(), 
User.getCurrent());
+      registry = ConnectionRegistryFactory.create(testUtil.getConfiguration(), 
User.getCurrent());
       RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(testUtil, 
registry);
       admin.balancerSwitch(false).get();
       locator = new AsyncMetaRegionLocator(registry);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
index a6d0ab81f91..6a5230b3a12 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
@@ -128,7 +128,7 @@ public class TestAsyncNonMetaRegionLocator {
     // Enable meta replica LoadBalance mode for this connection.
     c.set(RegionLocator.LOCATOR_META_REPLICAS_MODE, 
metaReplicaMode.toString());
     ConnectionRegistry registry =
-      ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), 
User.getCurrent());
+      ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), 
User.getCurrent());
     conn =
       new AsyncConnectionImpl(c, registry, registry.getClusterId().get(), 
null, User.getCurrent());
     locator = new AsyncNonMetaRegionLocator(conn);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
index 50c9ab9f565..439d527effc 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
@@ -125,7 +125,7 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
     TEST_UTIL.startMiniCluster(3);
     TEST_UTIL.getAdmin().balancerSwitch(false, true);
     ConnectionRegistry registry =
-      ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), 
User.getCurrent());
+      ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), 
User.getCurrent());
     CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
       registry.getClusterId().get(), null, User.getCurrent());
     LOCATOR = new AsyncNonMetaRegionLocator(CONN);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java
index bacd7bb32d7..2291c28a7c8 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java
@@ -100,7 +100,7 @@ public class TestAsyncRegionLocator {
     TEST_UTIL.createTable(TABLE_NAME, FAMILY);
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
     ConnectionRegistry registry =
-      ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), 
User.getCurrent());
+      ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), 
User.getCurrent());
     CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
       registry.getClusterId().get(), null, User.getCurrent());
     LOCATOR = CONN.getLocator();
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
index 3c8327145f3..baa4ee74ade 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
@@ -73,7 +73,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
     TEST_UTIL.createTable(TABLE_NAME, FAMILY);
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
     ConnectionRegistry registry =
-      ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), 
User.getCurrent());
+      ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), 
User.getCurrent());
     CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
       registry.getClusterId().get(), null, User.getCurrent());
   }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java
index 0de59a4c32b..2803db20e71 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java
@@ -95,8 +95,7 @@ public class TestAsyncTableUseMetaReplicas {
       FailPrimaryMetaScanCp.class.getName());
     UTIL.startMiniCluster(3);
     HBaseTestingUtil.setReplicas(UTIL.getAdmin(), TableName.META_TABLE_NAME, 
3);
-    try (ConnectionRegistry registry =
-      ConnectionRegistryFactory.getRegistry(conf, User.getCurrent())) {
+    try (ConnectionRegistry registry = ConnectionRegistryFactory.create(conf, 
User.getCurrent())) {
       RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL, registry);
     }
     try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBasicReadWriteWithDifferentConnectionRegistries.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBasicReadWriteWithDifferentConnectionRegistries.java
new file mode 100644
index 00000000000..5746ffa67f6
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBasicReadWriteWithDifferentConnectionRegistries.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNameTestRule;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test basic read write operation with different {@link ConnectionRegistry} 
implementations.
+ */
+@RunWith(Parameterized.class)
+@Category({ MediumTests.class, ClientTests.class })
+public class TestBasicReadWriteWithDifferentConnectionRegistries {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    
HBaseClassTestRule.forClass(TestBasicReadWriteWithDifferentConnectionRegistries.class);
+
+  private static final Logger LOG =
+    
LoggerFactory.getLogger(TestBasicReadWriteWithDifferentConnectionRegistries.class);
+
+  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
+
+  public enum RegistryImpl {
+    ZK,
+    RPC,
+    ZK_URI,
+    RPC_URI
+  }
+
+  @Parameter
+  public RegistryImpl impl;
+
+  @Rule
+  public final TableNameTestRule name = new TableNameTestRule();
+
+  private byte[] FAMILY = Bytes.toBytes("family");
+
+  private Connection conn;
+
+  @Parameters(name = "{index}: impl={0}")
+  public static List<Object[]> data() {
+    List<Object[]> data = new ArrayList<Object[]>();
+    for (RegistryImpl impl : RegistryImpl.values()) {
+      data.add(new Object[] { impl });
+    }
+    return data;
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    UTIL.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    switch (impl) {
+      case ZK: {
+        Configuration conf = HBaseConfiguration.create();
+        conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
+          ZKConnectionRegistry.class, ConnectionRegistry.class);
+        String quorum = UTIL.getZkCluster().getAddress().toString();
+        String path = 
UTIL.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT);
+        conf.set(HConstants.CLIENT_ZOOKEEPER_QUORUM, quorum);
+        conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, path);
+        LOG.info("connect to cluster through zk quorum={} and parent={}", 
quorum, path);
+        conn = ConnectionFactory.createConnection(conf);
+        break;
+      }
+      case RPC: {
+        Configuration conf = HBaseConfiguration.create();
+        conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
+          RpcConnectionRegistry.class, ConnectionRegistry.class);
+        String bootstrapServers =
+          
UTIL.getMiniHBaseCluster().getMaster().getServerName().getAddress().toString();
+        conf.set(RpcConnectionRegistry.BOOTSTRAP_NODES, bootstrapServers);
+        LOG.info("connect to cluster through rpc bootstrap servers={}", 
bootstrapServers);
+        conn = ConnectionFactory.createConnection(conf);
+        break;
+      }
+      case ZK_URI: {
+        String quorum = UTIL.getZkCluster().getAddress().toString();
+        String path = 
UTIL.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT);
+        URI connectionUri = new URI("hbase+zk://" + quorum + path);
+        LOG.info("connect to cluster through connection url: {}", 
connectionUri);
+        conn = ConnectionFactory.createConnection(connectionUri);
+        break;
+      }
+      case RPC_URI: {
+        URI connectionUri = new URI("hbase+rpc://"
+          + 
UTIL.getMiniHBaseCluster().getMaster().getServerName().getAddress().toString());
+        LOG.info("connect to cluster through connection url: {}", 
connectionUri);
+        conn = ConnectionFactory.createConnection(connectionUri);
+        break;
+      }
+      default:
+        throw new IllegalArgumentException("Unknown impl: " + impl);
+    }
+    try (Admin admin = conn.getAdmin()) {
+      admin.createTable(TableDescriptorBuilder.newBuilder(name.getTableName())
+        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build());
+    }
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    TableName tableName = name.getTableName();
+    try (Admin admin = conn.getAdmin()) {
+      admin.disableTable(tableName);
+      admin.deleteTable(tableName);
+    }
+    conn.close();
+  }
+
+  @Test
+  public void testReadWrite() throws Exception {
+    byte[] row = Bytes.toBytes("row");
+    byte[] qualifier = Bytes.toBytes("qualifier");
+    byte[] value = Bytes.toBytes("value");
+    try (Table table = conn.getTable(name.getTableName())) {
+      Put put = new Put(row).addColumn(FAMILY, qualifier, value);
+      table.put(put);
+      Result result = table.get(new Get(row));
+      assertArrayEquals(value, result.getValue(FAMILY, qualifier));
+      table.delete(new Delete(row));
+      assertFalse(table.exists(new Get(row)));
+    }
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java
index 5c78e53f7e6..12f278ebbfd 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java
@@ -77,8 +77,7 @@ public class TestCatalogReplicaLoadBalanceSimpleSelector {
       () -> 
TEST_UTIL.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME).size()
           >= numOfMetaReplica);
 
-    registry =
-      ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), 
User.getCurrent());
+    registry = ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), 
User.getCurrent());
     CONN = new AsyncConnectionImpl(conf, registry, 
registry.getClusterId().get(), null,
       User.getCurrent());
   }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
index beb054eaf36..29223dea5db 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
@@ -64,8 +64,7 @@ public class TestMetaRegionLocationCache {
   public static void setUp() throws Exception {
     TEST_UTIL.startMiniCluster(3);
     HBaseTestingUtil.setReplicas(TEST_UTIL.getAdmin(), 
TableName.META_TABLE_NAME, 3);
-    REGISTRY =
-      ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), 
User.getCurrent());
+    REGISTRY = ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), 
User.getCurrent());
     RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, 
REGISTRY);
     TEST_UTIL.getAdmin().balancerSwitch(false, true);
   }

Reply via email to