This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch HBASE-21512 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit ec42d7ec75be580996fa5853987d5e85707f9d51 Author: zhangduo <zhang...@apache.org> AuthorDate: Sat Dec 1 21:15:48 2018 +0800 HBASE-21516 Use AsyncConnection instead of Connection in SecureBulkLoadManager --- .../apache/hadoop/hbase/protobuf/ProtobufUtil.java | 5 +- .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 7 ++- .../hadoop/hbase/regionserver/HRegionServer.java | 2 +- .../hbase/regionserver/SecureBulkLoadManager.java | 24 ++++----- .../hadoop/hbase/security/token/TokenUtil.java | 57 +++++++++++++++++----- .../hadoop/hbase/security/token/TestTokenUtil.java | 42 ++++++++++++---- 6 files changed, 96 insertions(+), 41 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index a3d49b5..d9e620b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -261,13 +261,12 @@ public final class ProtobufUtil { * just {@link ServiceException}. Prefer this method to * {@link #getRemoteException(ServiceException)} because trying to * contain direct protobuf references. - * @param e */ - public static IOException handleRemoteException(Exception e) { + public static IOException handleRemoteException(Throwable e) { return makeIOExceptionOfException(e); } - private static IOException makeIOExceptionOfException(Exception e) { + private static IOException makeIOExceptionOfException(Throwable e) { Throwable t = e; if (e instanceof ServiceException || e instanceof org.apache.hbase.thirdparty.com.google.protobuf.ServiceException) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index fea81f1..de2fb7d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -40,7 +40,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.regex.Pattern; import java.util.stream.Collectors; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ByteBufferExtendedCell; @@ -123,6 +122,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Service; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; + import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; @@ -343,13 +343,12 @@ public final class ProtobufUtil { * just {@link ServiceException}. Prefer this method to * {@link #getRemoteException(ServiceException)} because trying to * contain direct protobuf references. - * @param e */ - public static IOException handleRemoteException(Exception e) { + public static IOException handleRemoteException(Throwable e) { return makeIOExceptionOfException(e); } - private static IOException makeIOExceptionOfException(Exception e) { + private static IOException makeIOExceptionOfException(Throwable e) { Throwable t = e; if (e instanceof ServiceException) { t = e.getCause(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index b4b1d3e..bbc9d3c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1937,7 +1937,7 @@ public class HRegionServer extends HasThread implements if (!isStopped() && !isAborted()) { initializeThreads(); } - this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, clusterConnection); + this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, asyncClusterConnection); this.secureBulkLoadManager.start(); // Health checker thread. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java index 566a6b6..add6519 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -28,7 +28,6 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiFunction; import java.util.function.Consumer; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -38,11 +37,12 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.AsyncConnection; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.regionserver.HRegion.BulkLoadListener; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier; import org.apache.hadoop.hbase.security.token.FsDelegationToken; import org.apache.hadoop.hbase.security.token.TokenUtil; import org.apache.hadoop.hbase.util.Bytes; @@ -56,7 +56,9 @@ import org.apache.hadoop.security.token.Token; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; @@ -111,9 +113,9 @@ public class SecureBulkLoadManager { private UserProvider userProvider; private ConcurrentHashMap<UserGroupInformation, Integer> ugiReferenceCounter; - private Connection conn; + private AsyncConnection conn; - SecureBulkLoadManager(Configuration conf, Connection conn) { + SecureBulkLoadManager(Configuration conf, AsyncConnection conn) { this.conf = conf; this.conn = conn; } @@ -212,23 +214,23 @@ public class SecureBulkLoadManager { familyPaths.add(new Pair<>(el.getFamily().toByteArray(), el.getPath())); } - Token userToken = null; + Token<AuthenticationTokenIdentifier> userToken = null; if (userProvider.isHadoopSecurityEnabled()) { - userToken = new Token(request.getFsToken().getIdentifier().toByteArray(), request.getFsToken() - .getPassword().toByteArray(), new Text(request.getFsToken().getKind()), new Text( - request.getFsToken().getService())); + userToken = new Token<>(request.getFsToken().getIdentifier().toByteArray(), + request.getFsToken().getPassword().toByteArray(), new Text(request.getFsToken().getKind()), + new Text(request.getFsToken().getService())); } final String bulkToken = request.getBulkToken(); User user = getActiveUser(); final UserGroupInformation ugi = user.getUGI(); if (userProvider.isHadoopSecurityEnabled()) { try { - Token tok = TokenUtil.obtainToken(conn); + Token<AuthenticationTokenIdentifier> tok = TokenUtil.obtainToken(conn).get(); if (tok != null) { boolean b = ugi.addToken(tok); LOG.debug("token added " + tok + " for user " + ugi + " return=" + b); } - } catch (IOException ioe) { + } catch (Exception ioe) { LOG.warn("unable to add token", ioe); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java index c54d905..28efb84 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,27 +15,29 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.security.token; +import com.google.protobuf.ByteString; +import com.google.protobuf.ServiceException; import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; import java.security.PrivilegedExceptionAction; - -import com.google.protobuf.ByteString; -import com.google.protobuf.ServiceException; - -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import java.util.concurrent.CompletableFuture; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.AsyncTable; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; +import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.AuthenticationService; +import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.GetAuthenticationTokenRequest; +import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.GetAuthenticationTokenResponse; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; @@ -45,6 +47,8 @@ import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; + /** * Utility methods for obtaining authentication tokens. */ @@ -64,12 +68,39 @@ public class TokenUtil { /** * Obtain and return an authentication token for the current user. + * @param conn The async HBase cluster connection + * @return the authentication token instance, wrapped by a {@link CompletableFuture}. + */ + public static CompletableFuture<Token<AuthenticationTokenIdentifier>> obtainToken( + AsyncConnection conn) { + CompletableFuture<Token<AuthenticationTokenIdentifier>> future = new CompletableFuture<>(); + if (injectedException != null) { + future.completeExceptionally(injectedException); + return future; + } + AsyncTable<?> table = conn.getTable(TableName.META_TABLE_NAME); + table.<AuthenticationService.Interface, GetAuthenticationTokenResponse> coprocessorService( + AuthenticationProtos.AuthenticationService::newStub, + (s, c, r) -> s.getAuthenticationToken(c, + AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance(), r), + HConstants.EMPTY_START_ROW).whenComplete((resp, error) -> { + if (error != null) { + future.completeExceptionally(ProtobufUtil.handleRemoteException(error)); + } else { + future.complete(toToken(resp.getToken())); + } + }); + return future; + } + + /** + * Obtain and return an authentication token for the current user. * @param conn The HBase cluster connection * @throws IOException if a remote error or serialization problem occurs. * @return the authentication token instance */ - public static Token<AuthenticationTokenIdentifier> obtainToken( - Connection conn) throws IOException { + public static Token<AuthenticationTokenIdentifier> obtainToken(Connection conn) + throws IOException { Table meta = null; try { injectFault(); @@ -77,9 +108,9 @@ public class TokenUtil { meta = conn.getTable(TableName.META_TABLE_NAME); CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW); AuthenticationProtos.AuthenticationService.BlockingInterface service = - AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel); - AuthenticationProtos.GetAuthenticationTokenResponse response = service.getAuthenticationToken(null, - AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance()); + AuthenticationService.newBlockingStub(rpcChannel); + GetAuthenticationTokenResponse response = + service.getAuthenticationToken(null, GetAuthenticationTokenRequest.getDefaultInstance()); return toToken(response.getToken()); } catch (ServiceException se) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenUtil.java index 32fcddb..585a3ec 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenUtil.java @@ -18,35 +18,53 @@ package org.apache.hadoop.hbase.security.token; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; import static org.junit.Assert.fail; +import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.net.URL; import java.net.URLClassLoader; - +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.client.AsyncConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.After; +import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @Category(SmallTests.class) public class TestTokenUtil { + @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestTokenUtil.class); + HBaseClassTestRule.forClass(TestTokenUtil.class); - @Test - public void testObtainToken() throws Exception { + private URLClassLoader cl; + + @Before + public void setUp() { URL urlPU = ProtobufUtil.class.getProtectionDomain().getCodeSource().getLocation(); URL urlTU = TokenUtil.class.getProtectionDomain().getCodeSource().getLocation(); + cl = new URLClassLoader(new URL[] { urlPU, urlTU }, getClass().getClassLoader()); + } - ClassLoader cl = new URLClassLoader(new URL[] { urlPU, urlTU }, getClass().getClassLoader()); + @After + public void tearDown() throws IOException { + Closeables.close(cl, true); + } + @Test + public void testObtainToken() throws Exception { Throwable injected = new com.google.protobuf.ServiceException("injected"); Class<?> tokenUtil = cl.loadClass(TokenUtil.class.getCanonicalName()); @@ -55,8 +73,7 @@ public class TestTokenUtil { shouldInjectFault.set(null, injected); try { - tokenUtil.getMethod("obtainToken", Connection.class) - .invoke(null, new Object[] { null }); + tokenUtil.getMethod("obtainToken", Connection.class).invoke(null, new Object[] { null }); fail("Should have injected exception."); } catch (InvocationTargetException e) { Throwable t = e; @@ -72,9 +89,16 @@ public class TestTokenUtil { } } + CompletableFuture<?> future = (CompletableFuture<?>) tokenUtil + .getMethod("obtainToken", AsyncConnection.class).invoke(null, new Object[] { null }); + try { + future.get(); + fail("Should have injected exception."); + } catch (ExecutionException e) { + assertSame(injected, e.getCause()); + } Boolean loaded = (Boolean) cl.loadClass(ProtobufUtil.class.getCanonicalName()) - .getDeclaredMethod("isClassLoaderLoaded") - .invoke(null); + .getDeclaredMethod("isClassLoaderLoaded").invoke(null); assertFalse("Should not have loaded DynamicClassLoader", loaded); } }