This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch branch-3 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-3 by this push: new 46377c5d966 HBASE-28436 Use connection url to specify the connection registry information (#5770) 46377c5d966 is described below commit 46377c5d96683a283c969a5d7c5ce5b12cfecedb Author: Duo Zhang <zhang...@apache.org> AuthorDate: Tue Apr 23 16:05:26 2024 +0800 HBASE-28436 Use connection url to specify the connection registry information (#5770) Signed-off-by: Istvan Toth <st...@apache.org> Signed-off-by: Nick Dimiduk <ndimi...@apache.org> Reviewed-by: Bryan Beaudreault <bbeaudrea...@apache.org> (cherry picked from commit e3761baec1158d617c46bbdf54725206544717e9) --- hbase-client/pom.xml | 5 + .../hadoop/hbase/client/ConnectionFactory.java | 313 +++++++++++++++++---- .../hbase/client/ConnectionRegistryFactory.java | 64 ++++- ...tory.java => ConnectionRegistryURIFactory.java} | 27 +- ...tory.java => RpcConnectionRegistryCreator.java} | 30 +- ...ctory.java => ZKConnectionRegistryCreator.java} | 33 ++- ...adoop.hbase.client.ConnectionRegistryURIFactory | 17 ++ .../TestConnectionRegistryCreatorUriParsing.java | 157 +++++++++++ .../hbase/client/ClusterConnectionFactory.java | 2 +- .../hbase/client/AbstractTestRegionLocator.java | 2 +- .../client/TestAsyncAdminWithRegionReplicas.java | 2 +- .../hbase/client/TestAsyncMetaRegionLocator.java | 3 +- .../client/TestAsyncNonMetaRegionLocator.java | 2 +- ...stAsyncNonMetaRegionLocatorConcurrenyLimit.java | 2 +- .../hbase/client/TestAsyncRegionLocator.java | 2 +- .../TestAsyncSingleRequestRpcRetryingCaller.java | 2 +- .../client/TestAsyncTableUseMetaReplicas.java | 3 +- ...ReadWriteWithDifferentConnectionRegistries.java | 177 ++++++++++++ ...estCatalogReplicaLoadBalanceSimpleSelector.java | 3 +- .../hbase/client/TestMetaRegionLocationCache.java | 3 +- 20 files changed, 737 insertions(+), 112 deletions(-) diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml index dba79ff07ee..8da4ab8e837 100644 --- a/hbase-client/pom.xml +++ b/hbase-client/pom.xml @@ -170,6 +170,11 @@ <artifactId>mockito-core</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-inline</artifactId> + <scope>test</scope> + </dependency> <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-library</artifactId> diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java index 716fb4863fe..f4ef4496dfc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.io.IOException; import java.lang.reflect.Constructor; +import java.net.URI; import java.security.PrivilegedExceptionAction; import java.util.Collections; import java.util.Map; @@ -89,41 +90,55 @@ public class ConnectionFactory { * instance. Typical usage: * * <pre> - * Connection connection = ConnectionFactory.createConnection(); - * Table table = connection.getTable(TableName.valueOf("mytable")); - * try { + * try (Connection connection = ConnectionFactory.createConnection(conf); + * Table table = connection.getTable(TableName.valueOf("table1"))) { * table.get(...); * ... - * } finally { - * table.close(); - * connection.close(); * } * </pre> * * @return Connection object for <code>conf</code> */ public static Connection createConnection() throws IOException { - Configuration conf = HBaseConfiguration.create(); - return createConnection(conf, null, AuthUtil.loginClient(conf)); + return createConnection(HBaseConfiguration.create()); + } + + /** + * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all + * housekeeping for a connection to the cluster. All tables and interfaces created from returned + * connection share zookeeper connection, meta cache, and connections to region servers and + * masters. <br> + * The caller is responsible for calling {@link Connection#close()} on the returned connection + * instance. Typical usage: + * + * <pre> + * try (Connection connection = ConnectionFactory.createConnection(conf); + * Table table = connection.getTable(TableName.valueOf("table1"))) { + * table.get(...); + * ... + * } + * </pre> + * + * @param connectionUri the connection uri for the hbase cluster + * @return Connection object for <code>conf</code> + */ + public static Connection createConnection(URI connectionUri) throws IOException { + return createConnection(connectionUri, HBaseConfiguration.create()); } /** * Create a new Connection instance using the passed <code>conf</code> instance. Connection * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces - * created from returned connection share zookeeper connection, meta cache, and connections to - * region servers and masters. <br> + * created from returned connection share zookeeper connection(if used), meta cache, and + * connections to region servers and masters. <br> * The caller is responsible for calling {@link Connection#close()} on the returned connection * instance. Typical usage: * * <pre> - * Connection connection = ConnectionFactory.createConnection(conf); - * Table table = connection.getTable(TableName.valueOf("mytable")); - * try { + * try (Connection connection = ConnectionFactory.createConnection(conf); + * Table table = connection.getTable(TableName.valueOf("table1"))) { * table.get(...); * ... - * } finally { - * table.close(); - * connection.close(); * } * </pre> * @@ -137,20 +152,41 @@ public class ConnectionFactory { /** * Create a new Connection instance using the passed <code>conf</code> instance. Connection * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces - * created from returned connection share zookeeper connection, meta cache, and connections to - * region servers and masters. <br> + * created from returned connection share zookeeper connection(if used), meta cache, and + * connections to region servers and masters. <br> * The caller is responsible for calling {@link Connection#close()} on the returned connection * instance. Typical usage: * * <pre> - * Connection connection = ConnectionFactory.createConnection(conf); - * Table table = connection.getTable(TableName.valueOf("mytable")); - * try { + * try (Connection connection = ConnectionFactory.createConnection(conf); + * Table table = connection.getTable(TableName.valueOf("table1"))) { + * table.get(...); + * ... + * } + * </pre> + * + * @param connectionUri the connection uri for the hbase cluster + * @param conf configuration + * @return Connection object for <code>conf</code> + */ + public static Connection createConnection(URI connectionUri, Configuration conf) + throws IOException { + return createConnection(connectionUri, conf, null, AuthUtil.loginClient(conf)); + } + + /** + * Create a new Connection instance using the passed <code>conf</code> instance. Connection + * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces + * created from returned connection share zookeeper connection(if used), meta cache, and + * connections to region servers and masters. <br> + * The caller is responsible for calling {@link Connection#close()} on the returned connection + * instance. Typical usage: + * + * <pre> + * try (Connection connection = ConnectionFactory.createConnection(conf); + * Table table = connection.getTable(TableName.valueOf("table1"))) { * table.get(...); * ... - * } finally { - * table.close(); - * connection.close(); * } * </pre> * @@ -166,20 +202,42 @@ public class ConnectionFactory { /** * Create a new Connection instance using the passed <code>conf</code> instance. Connection * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces - * created from returned connection share zookeeper connection, meta cache, and connections to - * region servers and masters. <br> + * created from returned connection share zookeeper connection(if used), meta cache, and + * connections to region servers and masters. <br> * The caller is responsible for calling {@link Connection#close()} on the returned connection * instance. Typical usage: * * <pre> - * Connection connection = ConnectionFactory.createConnection(conf); - * Table table = connection.getTable(TableName.valueOf("table1")); - * try { + * try (Connection connection = ConnectionFactory.createConnection(conf); + * Table table = connection.getTable(TableName.valueOf("table1"))) { + * table.get(...); + * ... + * } + * </pre> + * + * @param connectionUri the connection uri for the hbase cluster + * @param conf configuration + * @param pool the thread pool to use for batch operations + * @return Connection object for <code>conf</code> + */ + public static Connection createConnection(URI connectionUri, Configuration conf, + ExecutorService pool) throws IOException { + return createConnection(connectionUri, conf, pool, AuthUtil.loginClient(conf)); + } + + /** + * Create a new Connection instance using the passed <code>conf</code> instance. Connection + * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces + * created from returned connection share zookeeper connection(if used), meta cache, and + * connections to region servers and masters. <br> + * The caller is responsible for calling {@link Connection#close()} on the returned connection + * instance. Typical usage: + * + * <pre> + * try (Connection connection = ConnectionFactory.createConnection(conf); + * Table table = connection.getTable(TableName.valueOf("table1"))) { * table.get(...); * ... - * } finally { - * table.close(); - * connection.close(); * } * </pre> * @@ -194,20 +252,42 @@ public class ConnectionFactory { /** * Create a new Connection instance using the passed <code>conf</code> instance. Connection * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces - * created from returned connection share zookeeper connection, meta cache, and connections to - * region servers and masters. <br> + * created from returned connection share zookeeper connection(if used), meta cache, and + * connections to region servers and masters. <br> * The caller is responsible for calling {@link Connection#close()} on the returned connection * instance. Typical usage: * * <pre> - * Connection connection = ConnectionFactory.createConnection(conf); - * Table table = connection.getTable(TableName.valueOf("table1")); - * try { + * try (Connection connection = ConnectionFactory.createConnection(conf); + * Table table = connection.getTable(TableName.valueOf("table1"))) { + * table.get(...); + * ... + * } + * </pre> + * + * @param connectionUri the connection uri for the hbase cluster + * @param conf configuration + * @param user the user the connection is for + * @return Connection object for <code>conf</code> + */ + public static Connection createConnection(URI connectionUri, Configuration conf, User user) + throws IOException { + return createConnection(connectionUri, conf, null, user); + } + + /** + * Create a new Connection instance using the passed <code>conf</code> instance. Connection + * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces + * created from returned connection share zookeeper connection(if used), meta cache, and + * connections to region servers and masters. <br> + * The caller is responsible for calling {@link Connection#close()} on the returned connection + * instance. Typical usage: + * + * <pre> + * try (Connection connection = ConnectionFactory.createConnection(conf); + * Table table = connection.getTable(TableName.valueOf("table1"))) { * table.get(...); * ... - * } finally { - * table.close(); - * connection.close(); * } * </pre> * @@ -224,20 +304,43 @@ public class ConnectionFactory { /** * Create a new Connection instance using the passed <code>conf</code> instance. Connection * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces - * created from returned connection share zookeeper connection, meta cache, and connections to - * region servers and masters. <br> + * created from returned connection share zookeeper connection(if used), meta cache, and + * connections to region servers and masters. <br> * The caller is responsible for calling {@link Connection#close()} on the returned connection * instance. Typical usage: * * <pre> - * Connection connection = ConnectionFactory.createConnection(conf); - * Table table = connection.getTable(TableName.valueOf("table1")); - * try { + * try (Connection connection = ConnectionFactory.createConnection(conf); + * Table table = connection.getTable(TableName.valueOf("table1"))) { + * table.get(...); + * ... + * } + * </pre> + * + * @param connectionUri the connection uri for the hbase cluster + * @param conf configuration + * @param user the user the connection is for + * @param pool the thread pool to use for batch operations + * @return Connection object for <code>conf</code> + */ + public static Connection createConnection(URI connectionUri, Configuration conf, + ExecutorService pool, User user) throws IOException { + return createConnection(connectionUri, conf, pool, user, Collections.emptyMap()); + } + + /** + * Create a new Connection instance using the passed <code>conf</code> instance. Connection + * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces + * created from returned connection share zookeeper connection(if used), meta cache, and + * connections to region servers and masters. <br> + * The caller is responsible for calling {@link Connection#close()} on the returned connection + * instance. Typical usage: + * + * <pre> + * try (Connection connection = ConnectionFactory.createConnection(conf); + * Table table = connection.getTable(TableName.valueOf("table1"))) { * table.get(...); * ... - * } finally { - * table.close(); - * connection.close(); * } * </pre> * @@ -249,6 +352,37 @@ public class ConnectionFactory { */ public static Connection createConnection(Configuration conf, ExecutorService pool, final User user, Map<String, byte[]> connectionAttributes) throws IOException { + return createConnection(null, conf, pool, user, connectionAttributes); + } + + /** + * Create a new Connection instance using the passed <code>conf</code> instance. Connection + * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces + * created from returned connection share zookeeper connection(if used), meta cache, and + * connections to region servers and masters. <br> + * The caller is responsible for calling {@link Connection#close()} on the returned connection + * instance. Typical usage: + * + * <pre> + * Connection connection = ConnectionFactory.createConnection(conf); + * Table table = connection.getTable(TableName.valueOf("table1")); + * try (Connection connection = ConnectionFactory.createConnection(conf); + * Table table = connection.getTable(TableName.valueOf("table1"))) { + * table.get(...); + * ... + * } + * </pre> + * + * @param connectionUri the connection uri for the hbase cluster + * @param conf configuration + * @param user the user the connection is for + * @param pool the thread pool to use for batch operations + * @param connectionAttributes attributes to be sent along to server during connection establish + * @return Connection object for <code>conf</code> + */ + public static Connection createConnection(URI connectionUri, Configuration conf, + ExecutorService pool, final User user, Map<String, byte[]> connectionAttributes) + throws IOException { Class<?> clazz = conf.getClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL, ConnectionOverAsyncConnection.class, Connection.class); if (clazz != ConnectionOverAsyncConnection.class) { @@ -263,7 +397,7 @@ public class ConnectionFactory { throw new IOException(e); } } else { - return FutureUtils.get(createAsyncConnection(conf, user, connectionAttributes)) + return FutureUtils.get(createAsyncConnection(connectionUri, conf, user, connectionAttributes)) .toConnection(); } } @@ -277,6 +411,16 @@ public class ConnectionFactory { return createAsyncConnection(HBaseConfiguration.create()); } + /** + * Call {@link #createAsyncConnection(URI, Configuration)} using default HBaseConfiguration. + * @param connectionUri the connection uri for the hbase cluster + * @see #createAsyncConnection(URI, Configuration) + * @return AsyncConnection object wrapped by CompletableFuture + */ + public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri) { + return createAsyncConnection(connectionUri, HBaseConfiguration.create()); + } + /** * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code conf} and a * User object created by {@link UserProvider}. The given {@code conf} will also be used to @@ -287,6 +431,21 @@ public class ConnectionFactory { * @see UserProvider */ public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf) { + return createAsyncConnection(null, conf); + } + + /** + * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code connectionUri}, + * {@code conf} and a User object created by {@link UserProvider}. The given {@code conf} will + * also be used to initialize the {@link UserProvider}. + * @param connectionUri the connection uri for the hbase cluster + * @param conf configuration + * @return AsyncConnection object wrapped by CompletableFuture + * @see #createAsyncConnection(Configuration, User) + * @see UserProvider + */ + public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri, + Configuration conf) { User user; try { user = AuthUtil.loginClient(conf); @@ -295,7 +454,7 @@ public class ConnectionFactory { future.completeExceptionally(e); return future; } - return createAsyncConnection(conf, user); + return createAsyncConnection(connectionUri, conf, user); } /** @@ -315,7 +474,28 @@ public class ConnectionFactory { */ public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf, final User user) { - return createAsyncConnection(conf, user, null); + return createAsyncConnection(null, conf, user); + } + + /** + * Create a new AsyncConnection instance using the passed {@code connectionUri}, {@code conf} and + * {@code user}. AsyncConnection encapsulates all housekeeping for a connection to the cluster. + * All tables and interfaces created from returned connection share zookeeper connection(if used), + * meta cache, and connections to region servers and masters. + * <p> + * The caller is responsible for calling {@link AsyncConnection#close()} on the returned + * connection instance. + * <p> + * Usually you should only create one AsyncConnection instance in your code and use it everywhere + * as it is thread safe. + * @param connectionUri the connection uri for the hbase cluster + * @param conf configuration + * @param user the user the asynchronous connection is for + * @return AsyncConnection object wrapped by CompletableFuture + */ + public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri, + Configuration conf, final User user) { + return createAsyncConnection(connectionUri, conf, user, null); } /** @@ -336,9 +516,38 @@ public class ConnectionFactory { */ public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf, final User user, Map<String, byte[]> connectionAttributes) { + return createAsyncConnection(null, conf, user, connectionAttributes); + } + + /** + * Create a new AsyncConnection instance using the passed {@code connectionUri}, {@code conf} and + * {@code user}. AsyncConnection encapsulates all housekeeping for a connection to the cluster. + * All tables and interfaces created from returned connection share zookeeper connection(if used), + * meta cache, and connections to region servers and masters. + * <p> + * The caller is responsible for calling {@link AsyncConnection#close()} on the returned + * connection instance. + * <p> + * Usually you should only create one AsyncConnection instance in your code and use it everywhere + * as it is thread safe. + * @param connectionUri the connection uri for the hbase cluster + * @param conf configuration + * @param user the user the asynchronous connection is for + * @param connectionAttributes attributes to be sent along to server during connection establish + * @return AsyncConnection object wrapped by CompletableFuture + */ + public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri, + Configuration conf, final User user, Map<String, byte[]> connectionAttributes) { return TraceUtil.tracedFuture(() -> { + ConnectionRegistry registry; + try { + registry = connectionUri != null + ? ConnectionRegistryFactory.create(connectionUri, conf, user) + : ConnectionRegistryFactory.create(conf, user); + } catch (Exception e) { + return FutureUtils.failedFuture(e); + } CompletableFuture<AsyncConnection> future = new CompletableFuture<>(); - ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf, user); addListener(registry.getClusterId(), (clusterId, error) -> { if (error != null) { registry.close(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java index 415d46397b8..5eef2c5f93e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java @@ -17,27 +17,77 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY; - +import java.io.IOException; +import java.net.URI; +import java.util.ServiceLoader; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; /** - * Factory class to get the instance of configured connection registry. + * The entry point for creating a {@link ConnectionRegistry}. */ @InterfaceAudience.Private final class ConnectionRegistryFactory { + private static final Logger LOG = LoggerFactory.getLogger(ConnectionRegistryFactory.class); + + private static final ImmutableMap<String, ConnectionRegistryURIFactory> CREATORS; + static { + ImmutableMap.Builder<String, ConnectionRegistryURIFactory> builder = ImmutableMap.builder(); + for (ConnectionRegistryURIFactory factory : ServiceLoader + .load(ConnectionRegistryURIFactory.class)) { + builder.put(factory.getScheme().toLowerCase(), factory); + } + // throw IllegalArgumentException if there are duplicated keys + CREATORS = builder.buildOrThrow(); + } + private ConnectionRegistryFactory() { } - /** Returns The connection registry implementation to use. */ - static ConnectionRegistry getRegistry(Configuration conf, User user) { + /** + * Returns the connection registry implementation to use, for the given connection url + * {@code uri}. + * <p/> + * We use {@link ServiceLoader} to load different implementations, and use the scheme of the given + * {@code uri} to select. And if there is no protocol specified, or we can not find a + * {@link ConnectionRegistryURIFactory} implementation for the given scheme, we will fallback to + * use the old way to create the {@link ConnectionRegistry}. Notice that, if fallback happens, the + * specified connection url {@code uri} will not take effect, we will load all the related + * configurations from the given Configuration instance {@code conf} + */ + static ConnectionRegistry create(URI uri, Configuration conf, User user) throws IOException { + if (StringUtils.isBlank(uri.getScheme())) { + LOG.warn("No scheme specified for {}, fallback to use old way", uri); + return create(conf, user); + } + ConnectionRegistryURIFactory creator = CREATORS.get(uri.getScheme().toLowerCase()); + if (creator == null) { + LOG.warn("No creator registered for {}, fallback to use old way", uri); + return create(conf, user); + } + return creator.create(uri, conf, user); + } + + /** + * Returns the connection registry implementation to use. + * <p/> + * This is used when we do not have a connection url, we will use the old way to load the + * connection registry, by checking the + * {@literal HConstants#CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY} configuration. + */ + static ConnectionRegistry create(Configuration conf, User user) { Class<? extends ConnectionRegistry> clazz = - conf.getClass(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, RpcConnectionRegistry.class, - ConnectionRegistry.class); + conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, + RpcConnectionRegistry.class, ConnectionRegistry.class); return ReflectionUtils.newInstance(clazz, conf, user); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryURIFactory.java similarity index 60% copy from hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java copy to hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryURIFactory.java index 415d46397b8..ab2037a1c13 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryURIFactory.java @@ -17,27 +17,26 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY; - +import java.io.IOException; +import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; /** - * Factory class to get the instance of configured connection registry. + * For creating different {@link ConnectionRegistry} implementation. */ @InterfaceAudience.Private -final class ConnectionRegistryFactory { +public interface ConnectionRegistryURIFactory { - private ConnectionRegistryFactory() { - } + /** + * Instantiate the {@link ConnectionRegistry} using the given parameters. + */ + ConnectionRegistry create(URI uri, Configuration conf, User user) throws IOException; - /** Returns The connection registry implementation to use. */ - static ConnectionRegistry getRegistry(Configuration conf, User user) { - Class<? extends ConnectionRegistry> clazz = - conf.getClass(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, RpcConnectionRegistry.class, - ConnectionRegistry.class); - return ReflectionUtils.newInstance(clazz, conf, user); - } + /** + * The scheme for this implementation. Used to register this URI factory to the + * {@link ConnectionRegistryFactory}. + */ + String getScheme(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistryCreator.java similarity index 54% copy from hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java copy to hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistryCreator.java index 415d46397b8..cb2338b1429 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistryCreator.java @@ -17,27 +17,33 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY; - +import java.io.IOException; +import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * Factory class to get the instance of configured connection registry. + * Connection registry creator implementation for creating {@link RpcConnectionRegistry}. */ @InterfaceAudience.Private -final class ConnectionRegistryFactory { +public class RpcConnectionRegistryCreator implements ConnectionRegistryURIFactory { + + private static final Logger LOG = LoggerFactory.getLogger(RpcConnectionRegistryCreator.class); - private ConnectionRegistryFactory() { + @Override + public ConnectionRegistry create(URI uri, Configuration conf, User user) throws IOException { + assert getScheme().equals(uri.getScheme()); + LOG.debug("connect to hbase cluster with rpc bootstrap servers='{}'", uri.getAuthority()); + Configuration c = new Configuration(conf); + c.set(RpcConnectionRegistry.BOOTSTRAP_NODES, uri.getAuthority()); + return new RpcConnectionRegistry(c, user); } - /** Returns The connection registry implementation to use. */ - static ConnectionRegistry getRegistry(Configuration conf, User user) { - Class<? extends ConnectionRegistry> clazz = - conf.getClass(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, RpcConnectionRegistry.class, - ConnectionRegistry.class); - return ReflectionUtils.newInstance(clazz, conf, user); + @Override + public String getScheme() { + return "hbase+rpc"; } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistryCreator.java similarity index 51% copy from hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java copy to hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistryCreator.java index 415d46397b8..8aa51e04fe4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistryCreator.java @@ -17,27 +17,36 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY; - +import java.io.IOException; +import java.net.URI; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * Factory class to get the instance of configured connection registry. + * Connection registry creator implementation for creating {@link ZKConnectionRegistry}. */ @InterfaceAudience.Private -final class ConnectionRegistryFactory { +public class ZKConnectionRegistryCreator implements ConnectionRegistryURIFactory { + + private static final Logger LOG = LoggerFactory.getLogger(ZKConnectionRegistryCreator.class); - private ConnectionRegistryFactory() { + @Override + public ConnectionRegistry create(URI uri, Configuration conf, User user) throws IOException { + assert getScheme().equals(uri.getScheme()); + LOG.debug("connect to hbase cluster with zk quorum='{}' and parent='{}'", uri.getAuthority(), + uri.getPath()); + Configuration c = new Configuration(conf); + c.set(HConstants.CLIENT_ZOOKEEPER_QUORUM, uri.getAuthority()); + c.set(HConstants.ZOOKEEPER_ZNODE_PARENT, uri.getPath()); + return new ZKConnectionRegistry(c, user); } - /** Returns The connection registry implementation to use. */ - static ConnectionRegistry getRegistry(Configuration conf, User user) { - Class<? extends ConnectionRegistry> clazz = - conf.getClass(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, RpcConnectionRegistry.class, - ConnectionRegistry.class); - return ReflectionUtils.newInstance(clazz, conf, user); + @Override + public String getScheme() { + return "hbase+zk"; } } diff --git a/hbase-client/src/main/resources/META-INF/services/org.apache.hadoop.hbase.client.ConnectionRegistryURIFactory b/hbase-client/src/main/resources/META-INF/services/org.apache.hadoop.hbase.client.ConnectionRegistryURIFactory new file mode 100644 index 00000000000..b25a569776f --- /dev/null +++ b/hbase-client/src/main/resources/META-INF/services/org.apache.hadoop.hbase.client.ConnectionRegistryURIFactory @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.hadoop.hbase.client.RpcConnectionRegistryCreator +org.apache.hadoop.hbase.client.ZKConnectionRegistryCreator \ No newline at end of file diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryCreatorUriParsing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryCreatorUriParsing.java new file mode 100644 index 00000000000..4dabd894b5b --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryCreatorUriParsing.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.mockStatic; + +import java.net.URI; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.ReflectionUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.ArgumentCaptor; +import org.mockito.MockedConstruction; +import org.mockito.MockedStatic; + +/** + * Make sure we can successfully parse the URI component + */ +@Category({ ClientTests.class, SmallTests.class }) +public class TestConnectionRegistryCreatorUriParsing { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestConnectionRegistryCreatorUriParsing.class); + + private Configuration conf; + + private User user; + + private MockedConstruction<RpcConnectionRegistry> mockedRpcRegistry; + + private MockedConstruction<ZKConnectionRegistry> mockedZkRegistry; + + private MockedStatic<ReflectionUtils> mockedReflectionUtils; + + private List<?> args; + + @Before + public void setUp() { + conf = HBaseConfiguration.create(); + user = mock(User.class); + args = null; + mockedRpcRegistry = mockConstruction(RpcConnectionRegistry.class, (mock, context) -> { + args = context.arguments(); + }); + mockedZkRegistry = mockConstruction(ZKConnectionRegistry.class, (mock, context) -> { + args = context.arguments(); + }); + mockedReflectionUtils = mockStatic(ReflectionUtils.class); + } + + @After + public void tearDown() { + mockedRpcRegistry.closeOnDemand(); + mockedZkRegistry.closeOnDemand(); + mockedReflectionUtils.closeOnDemand(); + } + + @Test + public void testParseRpcSingle() throws Exception { + ConnectionRegistryFactory.create(new URI("hbase+rpc://server1:123"), conf, user); + assertEquals(1, mockedRpcRegistry.constructed().size()); + assertSame(user, args.get(1)); + Configuration conf = (Configuration) args.get(0); + assertEquals("server1:123", conf.get(RpcConnectionRegistry.BOOTSTRAP_NODES)); + } + + @Test + public void testParseRpcMultiple() throws Exception { + ConnectionRegistryFactory.create(new URI("hbase+rpc://server1:123,server2:456,server3:789"), + conf, user); + assertEquals(1, mockedRpcRegistry.constructed().size()); + assertSame(user, args.get(1)); + Configuration conf = (Configuration) args.get(0); + assertEquals("server1:123,server2:456,server3:789", + conf.get(RpcConnectionRegistry.BOOTSTRAP_NODES)); + } + + @Test + public void testParseZkSingle() throws Exception { + ConnectionRegistryFactory.create(new URI("hbase+zk://server1:123/root"), conf, user); + assertEquals(1, mockedZkRegistry.constructed().size()); + assertSame(user, args.get(1)); + Configuration conf = (Configuration) args.get(0); + assertEquals("server1:123", conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM)); + assertEquals("/root", conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); + } + + @Test + public void testParseZkMultiple() throws Exception { + ConnectionRegistryFactory + .create(new URI("hbase+zk://server1:123,server2:456,server3:789/root/path"), conf, user); + assertEquals(1, mockedZkRegistry.constructed().size()); + assertSame(user, args.get(1)); + Configuration conf = (Configuration) args.get(0); + assertEquals("server1:123,server2:456,server3:789", + conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM)); + assertEquals("/root/path", conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); + } + + @Test + public void testFallbackNoScheme() throws Exception { + conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class, + ConnectionRegistry.class); + ConnectionRegistryFactory.create(new URI("server1:2181/path"), conf, user); + ArgumentCaptor<Class<?>> clazzCaptor = ArgumentCaptor.forClass(Class.class); + ArgumentCaptor<Object[]> argsCaptor = ArgumentCaptor.forClass(Object[].class); + mockedReflectionUtils + .verify(() -> ReflectionUtils.newInstance(clazzCaptor.capture(), argsCaptor.capture())); + assertEquals(ZKConnectionRegistry.class, clazzCaptor.getValue()); + assertSame(conf, argsCaptor.getValue()[0]); + assertSame(user, argsCaptor.getValue()[1]); + } + + @Test + public void testFallbackNoCreator() throws Exception { + conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, RpcConnectionRegistry.class, + ConnectionRegistry.class); + ConnectionRegistryFactory.create(new URI("hbase+tls://server1:123/path"), conf, user); + ArgumentCaptor<Class<?>> clazzCaptor = ArgumentCaptor.forClass(Class.class); + ArgumentCaptor<Object[]> argsCaptor = ArgumentCaptor.forClass(Object[].class); + mockedReflectionUtils + .verify(() -> ReflectionUtils.newInstance(clazzCaptor.capture(), argsCaptor.capture())); + assertEquals(RpcConnectionRegistry.class, clazzCaptor.getValue()); + assertSame(conf, argsCaptor.getValue()[0]); + assertSame(user, argsCaptor.getValue()[1]); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java index 7225f92b7ff..ed90863763a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java @@ -64,7 +64,7 @@ public final class ClusterConnectionFactory { */ public static AsyncClusterConnection createAsyncClusterConnection(Configuration conf, SocketAddress localAddress, User user) throws IOException { - return createAsyncClusterConnection(conf, ConnectionRegistryFactory.getRegistry(conf, user), + return createAsyncClusterConnection(conf, ConnectionRegistryFactory.create(conf, user), localAddress, user); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java index 0ff105743e0..031dff736c8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java @@ -60,7 +60,7 @@ public abstract class AbstractTestRegionLocator { UTIL.getAdmin().createTable(td, SPLIT_KEYS); UTIL.waitTableAvailable(TABLE_NAME); try (ConnectionRegistry registry = - ConnectionRegistryFactory.getRegistry(UTIL.getConfiguration(), User.getCurrent())) { + ConnectionRegistryFactory.create(UTIL.getConfiguration(), User.getCurrent())) { RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL, registry); } UTIL.getAdmin().balancerSwitch(false, true); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java index da400f29c0c..bb0eb31d254 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java @@ -56,7 +56,7 @@ public class TestAsyncAdminWithRegionReplicas extends TestAsyncAdminBase { TestAsyncAdminBase.setUpBeforeClass(); HBaseTestingUtil.setReplicas(TEST_UTIL.getAdmin(), TableName.META_TABLE_NAME, 3); try (ConnectionRegistry registry = - ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent())) { + ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent())) { RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, registry); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java index 90d2cb51e8c..e14cd32a388 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java @@ -107,8 +107,7 @@ public class TestAsyncMetaRegionLocator { testUtil = miniClusterRule.getTestingUtility(); HBaseTestingUtil.setReplicas(admin, TableName.META_TABLE_NAME, 3); testUtil.waitUntilNoRegionsInTransition(); - registry = - ConnectionRegistryFactory.getRegistry(testUtil.getConfiguration(), User.getCurrent()); + registry = ConnectionRegistryFactory.create(testUtil.getConfiguration(), User.getCurrent()); RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(testUtil, registry); admin.balancerSwitch(false).get(); locator = new AsyncMetaRegionLocator(registry); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java index a6d0ab81f91..6a5230b3a12 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java @@ -128,7 +128,7 @@ public class TestAsyncNonMetaRegionLocator { // Enable meta replica LoadBalance mode for this connection. c.set(RegionLocator.LOCATOR_META_REPLICAS_MODE, metaReplicaMode.toString()); ConnectionRegistry registry = - ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent()); + ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent()); conn = new AsyncConnectionImpl(c, registry, registry.getClusterId().get(), null, User.getCurrent()); locator = new AsyncNonMetaRegionLocator(conn); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java index 50c9ab9f565..439d527effc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java @@ -125,7 +125,7 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit { TEST_UTIL.startMiniCluster(3); TEST_UTIL.getAdmin().balancerSwitch(false, true); ConnectionRegistry registry = - ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent()); + ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent()); CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, registry.getClusterId().get(), null, User.getCurrent()); LOCATOR = new AsyncNonMetaRegionLocator(CONN); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java index bacd7bb32d7..2291c28a7c8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java @@ -100,7 +100,7 @@ public class TestAsyncRegionLocator { TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.waitTableAvailable(TABLE_NAME); ConnectionRegistry registry = - ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent()); + ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent()); CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, registry.getClusterId().get(), null, User.getCurrent()); LOCATOR = CONN.getLocator(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java index 3c8327145f3..baa4ee74ade 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java @@ -73,7 +73,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller { TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.waitTableAvailable(TABLE_NAME); ConnectionRegistry registry = - ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent()); + ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent()); CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, registry.getClusterId().get(), null, User.getCurrent()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java index 0de59a4c32b..2803db20e71 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java @@ -95,8 +95,7 @@ public class TestAsyncTableUseMetaReplicas { FailPrimaryMetaScanCp.class.getName()); UTIL.startMiniCluster(3); HBaseTestingUtil.setReplicas(UTIL.getAdmin(), TableName.META_TABLE_NAME, 3); - try (ConnectionRegistry registry = - ConnectionRegistryFactory.getRegistry(conf, User.getCurrent())) { + try (ConnectionRegistry registry = ConnectionRegistryFactory.create(conf, User.getCurrent())) { RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL, registry); } try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBasicReadWriteWithDifferentConnectionRegistries.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBasicReadWriteWithDifferentConnectionRegistries.java new file mode 100644 index 00000000000..5746ffa67f6 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBasicReadWriteWithDifferentConnectionRegistries.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertFalse; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNameTestRule; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test basic read write operation with different {@link ConnectionRegistry} implementations. + */ +@RunWith(Parameterized.class) +@Category({ MediumTests.class, ClientTests.class }) +public class TestBasicReadWriteWithDifferentConnectionRegistries { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBasicReadWriteWithDifferentConnectionRegistries.class); + + private static final Logger LOG = + LoggerFactory.getLogger(TestBasicReadWriteWithDifferentConnectionRegistries.class); + + private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); + + public enum RegistryImpl { + ZK, + RPC, + ZK_URI, + RPC_URI + } + + @Parameter + public RegistryImpl impl; + + @Rule + public final TableNameTestRule name = new TableNameTestRule(); + + private byte[] FAMILY = Bytes.toBytes("family"); + + private Connection conn; + + @Parameters(name = "{index}: impl={0}") + public static List<Object[]> data() { + List<Object[]> data = new ArrayList<Object[]>(); + for (RegistryImpl impl : RegistryImpl.values()) { + data.add(new Object[] { impl }); + } + return data; + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + UTIL.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + switch (impl) { + case ZK: { + Configuration conf = HBaseConfiguration.create(); + conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, + ZKConnectionRegistry.class, ConnectionRegistry.class); + String quorum = UTIL.getZkCluster().getAddress().toString(); + String path = UTIL.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT); + conf.set(HConstants.CLIENT_ZOOKEEPER_QUORUM, quorum); + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, path); + LOG.info("connect to cluster through zk quorum={} and parent={}", quorum, path); + conn = ConnectionFactory.createConnection(conf); + break; + } + case RPC: { + Configuration conf = HBaseConfiguration.create(); + conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, + RpcConnectionRegistry.class, ConnectionRegistry.class); + String bootstrapServers = + UTIL.getMiniHBaseCluster().getMaster().getServerName().getAddress().toString(); + conf.set(RpcConnectionRegistry.BOOTSTRAP_NODES, bootstrapServers); + LOG.info("connect to cluster through rpc bootstrap servers={}", bootstrapServers); + conn = ConnectionFactory.createConnection(conf); + break; + } + case ZK_URI: { + String quorum = UTIL.getZkCluster().getAddress().toString(); + String path = UTIL.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT); + URI connectionUri = new URI("hbase+zk://" + quorum + path); + LOG.info("connect to cluster through connection url: {}", connectionUri); + conn = ConnectionFactory.createConnection(connectionUri); + break; + } + case RPC_URI: { + URI connectionUri = new URI("hbase+rpc://" + + UTIL.getMiniHBaseCluster().getMaster().getServerName().getAddress().toString()); + LOG.info("connect to cluster through connection url: {}", connectionUri); + conn = ConnectionFactory.createConnection(connectionUri); + break; + } + default: + throw new IllegalArgumentException("Unknown impl: " + impl); + } + try (Admin admin = conn.getAdmin()) { + admin.createTable(TableDescriptorBuilder.newBuilder(name.getTableName()) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build()); + } + } + + @After + public void tearDown() throws Exception { + TableName tableName = name.getTableName(); + try (Admin admin = conn.getAdmin()) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + conn.close(); + } + + @Test + public void testReadWrite() throws Exception { + byte[] row = Bytes.toBytes("row"); + byte[] qualifier = Bytes.toBytes("qualifier"); + byte[] value = Bytes.toBytes("value"); + try (Table table = conn.getTable(name.getTableName())) { + Put put = new Put(row).addColumn(FAMILY, qualifier, value); + table.put(put); + Result result = table.get(new Get(row)); + assertArrayEquals(value, result.getValue(FAMILY, qualifier)); + table.delete(new Delete(row)); + assertFalse(table.exists(new Get(row))); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java index 5c78e53f7e6..12f278ebbfd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java @@ -77,8 +77,7 @@ public class TestCatalogReplicaLoadBalanceSimpleSelector { () -> TEST_UTIL.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME).size() >= numOfMetaReplica); - registry = - ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent()); + registry = ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent()); CONN = new AsyncConnectionImpl(conf, registry, registry.getClusterId().get(), null, User.getCurrent()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java index beb054eaf36..29223dea5db 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java @@ -64,8 +64,7 @@ public class TestMetaRegionLocationCache { public static void setUp() throws Exception { TEST_UTIL.startMiniCluster(3); HBaseTestingUtil.setReplicas(TEST_UTIL.getAdmin(), TableName.META_TABLE_NAME, 3); - REGISTRY = - ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent()); + REGISTRY = ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent()); RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, REGISTRY); TEST_UTIL.getAdmin().balancerSwitch(false, true); }