This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch HBASE-21512 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit e0c580dcc86f220d364589cf9ad9bd034d11193b Author: zhangduo <zhang...@apache.org> AuthorDate: Thu Dec 6 21:25:34 2018 +0800 HBASE-21526 Use AsyncClusterConnection in ServerManager for getRsAdmin --- .../hbase/client/AsyncClusterConnection.java | 6 + .../hadoop/hbase/client/AsyncConnectionImpl.java | 4 + .../hbase/client/AsyncRegionServerAdmin.java | 210 +++++++++++++++++++++ .../org/apache/hadoop/hbase/util/FutureUtils.java | 2 +- .../org/apache/hadoop/hbase/master/HMaster.java | 15 +- .../apache/hadoop/hbase/master/ServerManager.java | 67 ------- .../master/procedure/RSProcedureDispatcher.java | 44 +++-- 7 files changed, 262 insertions(+), 86 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java index c7dea25..1327fd7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.yetus.audience.InterfaceAudience; @@ -27,6 +28,11 @@ import org.apache.yetus.audience.InterfaceAudience; public interface AsyncClusterConnection extends AsyncConnection { /** + * Get the admin service for the given region server. + */ + AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName); + + /** * Get the nonce generator for this connection. */ NonceGenerator getNonceGenerator(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 50e27c4..9bead83 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -359,4 +359,8 @@ class AsyncConnectionImpl implements AsyncClusterConnection { return new HBaseHbck(MasterProtos.HbckService.newBlockingStub( rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory); } + + public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) { + return new AsyncRegionServerAdmin(serverName, this); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java new file mode 100644 index 0000000..9accd89 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; + +/** + * A simple wrapper of the {@link AdminService} for a region server, which returns a + * {@link CompletableFuture}. This is easier to use, as if you use the raw protobuf interface, you + * need to get the result from the {@link RpcCallback}, and if there is an exception, you need to + * get it from the {@link RpcController} passed in. + * <p/> + * Notice that there is no retry, and this is intentional. We have different retry for different + * usage for now, if later we want to unify them, we can move the retry logic into this class. + */ +@InterfaceAudience.Private +public class AsyncRegionServerAdmin { + + private final ServerName server; + + private final AsyncConnectionImpl conn; + + AsyncRegionServerAdmin(ServerName server, AsyncConnectionImpl conn) { + this.server = server; + this.conn = conn; + } + + @FunctionalInterface + private interface RpcCall<RESP> { + void call(AdminService.Interface stub, HBaseRpcController controller, RpcCallback<RESP> done); + } + + private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall) { + CompletableFuture<RESP> future = new CompletableFuture<>(); + HBaseRpcController controller = conn.rpcControllerFactory.newController(); + try { + rpcCall.call(conn.getAdminStub(server), controller, new RpcCallback<RESP>() { + + @Override + public void run(RESP resp) { + if (controller.failed()) { + future.completeExceptionally(controller.getFailed()); + } else { + future.complete(resp); + } + } + }); + } catch (IOException e) { + future.completeExceptionally(e); + } + return future; + } + + public CompletableFuture<GetRegionInfoResponse> getRegionInfo(GetRegionInfoRequest request) { + return call((stub, controller, done) -> stub.getRegionInfo(controller, request, done)); + } + + public CompletableFuture<GetStoreFileResponse> getStoreFile(GetStoreFileRequest request) { + return call((stub, controller, done) -> stub.getStoreFile(controller, request, done)); + } + + public CompletableFuture<GetOnlineRegionResponse> getOnlineRegion( + GetOnlineRegionRequest request) { + return call((stub, controller, done) -> stub.getOnlineRegion(controller, request, done)); + } + + public CompletableFuture<OpenRegionResponse> openRegion(OpenRegionRequest request) { + return call((stub, controller, done) -> stub.openRegion(controller, request, done)); + } + + public CompletableFuture<WarmupRegionResponse> warmupRegion(WarmupRegionRequest request) { + return call((stub, controller, done) -> stub.warmupRegion(controller, request, done)); + } + + public CompletableFuture<CloseRegionResponse> closeRegion(CloseRegionRequest request) { + return call((stub, controller, done) -> stub.closeRegion(controller, request, done)); + } + + public CompletableFuture<FlushRegionResponse> flushRegion(FlushRegionRequest request) { + return call((stub, controller, done) -> stub.flushRegion(controller, request, done)); + } + + public CompletableFuture<CompactionSwitchResponse> compactionSwitch( + CompactionSwitchRequest request) { + return call((stub, controller, done) -> stub.compactionSwitch(controller, request, done)); + } + + public CompletableFuture<CompactRegionResponse> compactRegion(CompactRegionRequest request) { + return call((stub, controller, done) -> stub.compactRegion(controller, request, done)); + } + + public CompletableFuture<ReplicateWALEntryResponse> replicateWALEntry( + ReplicateWALEntryRequest request) { + return call((stub, controller, done) -> stub.replicateWALEntry(controller, request, done)); + } + + public CompletableFuture<ReplicateWALEntryResponse> replay(ReplicateWALEntryRequest request) { + return call((stub, controller, done) -> stub.replay(controller, request, done)); + } + + public CompletableFuture<RollWALWriterResponse> rollWALWriter(RollWALWriterRequest request) { + return call((stub, controller, done) -> stub.rollWALWriter(controller, request, done)); + } + + public CompletableFuture<GetServerInfoResponse> getServerInfo(GetServerInfoRequest request) { + return call((stub, controller, done) -> stub.getServerInfo(controller, request, done)); + } + + public CompletableFuture<StopServerResponse> stopServer(StopServerRequest request) { + return call((stub, controller, done) -> stub.stopServer(controller, request, done)); + } + + public CompletableFuture<UpdateFavoredNodesResponse> updateFavoredNodes( + UpdateFavoredNodesRequest request) { + return call((stub, controller, done) -> stub.updateFavoredNodes(controller, request, done)); + } + + public CompletableFuture<UpdateConfigurationResponse> updateConfiguration( + UpdateConfigurationRequest request) { + return call((stub, controller, done) -> stub.updateConfiguration(controller, request, done)); + } + + public CompletableFuture<GetRegionLoadResponse> getRegionLoad(GetRegionLoadRequest request) { + return call((stub, controller, done) -> stub.getRegionLoad(controller, request, done)); + } + + public CompletableFuture<ClearCompactionQueuesResponse> clearCompactionQueues( + ClearCompactionQueuesRequest request) { + return call((stub, controller, done) -> stub.clearCompactionQueues(controller, request, done)); + } + + public CompletableFuture<ClearRegionBlockCacheResponse> clearRegionBlockCache( + ClearRegionBlockCacheRequest request) { + return call((stub, controller, done) -> stub.clearRegionBlockCache(controller, request, done)); + } + + public CompletableFuture<GetSpaceQuotaSnapshotsResponse> getSpaceQuotaSnapshots( + GetSpaceQuotaSnapshotsRequest request) { + return call((stub, controller, done) -> stub.getSpaceQuotaSnapshots(controller, request, done)); + } + + public CompletableFuture<ExecuteProceduresResponse> executeProcedures( + ExecuteProceduresRequest request) { + return call((stub, controller, done) -> stub.executeProcedures(controller, request, done)); + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java index 067e66b..f4a7332 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java @@ -57,4 +57,4 @@ public final class FutureUtils { } }); } -} \ No newline at end of file +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 7579fd5..cf56c4f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -195,6 +195,7 @@ import org.apache.hadoop.hbase.util.BloomFilterUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CompressionTest; import org.apache.hadoop.hbase.util.EncryptionTest; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.HBaseFsck; import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.HasThread; @@ -227,6 +228,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.collect.Maps; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceViolationPolicy; @@ -1955,6 +1957,15 @@ public class HMaster extends HRegionServer implements MasterServices { }); } + private void warmUpRegion(ServerName server, RegionInfo region) { + FutureUtils.addListener(asyncClusterConnection.getRegionServerAdmin(server) + .warmupRegion(RequestConverter.buildWarmupRegionRequest(region)), (r, e) -> { + if (e != null) { + LOG.warn("Failed to warm up region {} on server {}", region, server, e); + } + }); + } + // Public so can be accessed by tests. Blocks until move is done. // Replace with an async implementation from which you can get // a success/failure result. @@ -2026,7 +2037,9 @@ public class HMaster extends HRegionServer implements MasterServices { // Warmup the region on the destination before initiating the move. this call // is synchronous and takes some time. doing it before the source region gets // closed - serverManager.sendRegionWarmup(rp.getDestination(), hri); + // A region server could reject the close request because it either does not + // have the specified region or the region is being split. + warmUpRegion(rp.getDestination(), hri); LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer"); Future<byte []> future = this.assignmentManager.moveAsync(rp); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 86d72d1..c26ef6c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.net.InetAddress; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -51,12 +50,9 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.YouAreDeadException; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.ipc.HBaseRpcController; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.monitoring.MonitoredTask; -import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -159,25 +155,16 @@ public class ServerManager { private final ConcurrentNavigableMap<ServerName, ServerMetrics> onlineServers = new ConcurrentSkipListMap<>(); - /** - * Map of admin interfaces per registered regionserver; these interfaces we use to control - * regionservers out on the cluster - */ - private final Map<ServerName, AdminService.BlockingInterface> rsAdmins = new HashMap<>(); - /** List of region servers that should not get any more new regions. */ private final ArrayList<ServerName> drainingServers = new ArrayList<>(); private final MasterServices master; - private final ClusterConnection connection; private final DeadServer deadservers = new DeadServer(); private final long maxSkew; private final long warningSkew; - private final RpcControllerFactory rpcControllerFactory; - /** Listeners that are called on server events. */ private List<ServerListener> listeners = new CopyOnWriteArrayList<>(); @@ -189,8 +176,6 @@ public class ServerManager { Configuration c = master.getConfiguration(); maxSkew = c.getLong("hbase.master.maxclockskew", 30000); warningSkew = c.getLong("hbase.master.warningclockskew", 10000); - this.connection = master.getClusterConnection(); - this.rpcControllerFactory = this.connection == null? null: connection.getRpcControllerFactory(); persistFlushedSequenceId = c.getBoolean(PERSIST_FLUSHEDSEQUENCEID, PERSIST_FLUSHEDSEQUENCEID_DEFAULT); } @@ -438,7 +423,6 @@ public class ServerManager { void recordNewServerWithLock(final ServerName serverName, final ServerMetrics sl) { LOG.info("Registering regionserver=" + serverName); this.onlineServers.put(serverName, sl); - this.rsAdmins.remove(serverName); } @VisibleForTesting @@ -633,7 +617,6 @@ public class ServerManager { this.onlineServers.remove(sn); onlineServers.notifyAll(); } - this.rsAdmins.remove(sn); } /* @@ -676,34 +659,6 @@ public class ServerManager { return this.drainingServers.add(sn); } - // RPC methods to region servers - - private HBaseRpcController newRpcController() { - return rpcControllerFactory == null ? null : rpcControllerFactory.newController(); - } - - /** - * Sends a WARMUP RPC to the specified server to warmup the specified region. - * <p> - * A region server could reject the close request because it either does not - * have the specified region or the region is being split. - * @param server server to warmup a region - * @param region region to warmup - */ - public void sendRegionWarmup(ServerName server, - RegionInfo region) { - if (server == null) return; - try { - AdminService.BlockingInterface admin = getRsAdmin(server); - HBaseRpcController controller = newRpcController(); - ProtobufUtil.warmupRegion(controller, admin, region); - } catch (IOException e) { - LOG.error("Received exception in RPC for warmup server:" + - server + "region: " + region + - "exception: " + e); - } - } - /** * Contacts a region server and waits up to timeout ms * to close the region. This bypasses the active hmaster. @@ -737,28 +692,6 @@ public class ServerManager { } /** - * @param sn - * @return Admin interface for the remote regionserver named <code>sn</code> - * @throws IOException - * @throws RetriesExhaustedException wrapping a ConnectException if failed - */ - public AdminService.BlockingInterface getRsAdmin(final ServerName sn) - throws IOException { - AdminService.BlockingInterface admin = this.rsAdmins.get(sn); - if (admin == null) { - LOG.debug("New admin connection to " + sn.toString()); - if (sn.equals(master.getServerName()) && master instanceof HRegionServer) { - // A master is also a region server now, see HBASE-10569 for details - admin = ((HRegionServer)master).getRSRpcServices(); - } else { - admin = this.connection.getAdmin(sn); - } - this.rsAdmins.put(sn, admin); - } - return admin; - } - - /** * Calculate min necessary to start. This is not an absolute. It is just * a friction that will cause us hang around a bit longer waiting on * RegionServers to check-in. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java index 638f9d3..f3ab4b3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java @@ -18,12 +18,15 @@ package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.List; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; @@ -37,11 +40,11 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.base.Throwables; import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; -import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; @@ -159,13 +162,8 @@ public class RSProcedureDispatcher this.serverName = serverName; } - protected AdminService.BlockingInterface getRsAdmin() throws IOException { - final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName); - if (admin == null) { - throw new IOException("Attempting to send OPEN RPC to server " + getServerName() + - " failed because no RPC connection found to this server"); - } - return admin; + protected AsyncRegionServerAdmin getRsAdmin() throws IOException { + return master.getAsyncClusterConnection().getRegionServerAdmin(serverName); } protected ServerName getServerName() { @@ -344,9 +342,13 @@ public class RSProcedureDispatcher protected ExecuteProceduresResponse sendRequest(final ServerName serverName, final ExecuteProceduresRequest request) throws IOException { try { - return getRsAdmin().executeProcedures(null, request); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); + return getRsAdmin().executeProcedures(request).get(); + } catch (InterruptedException e) { + throw (IOException) new InterruptedIOException().initCause(e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + Throwables.propagateIfPossible(cause, IOException.class); + throw new IOException(cause); } } @@ -407,9 +409,13 @@ public class RSProcedureDispatcher private OpenRegionResponse sendRequest(final ServerName serverName, final OpenRegionRequest request) throws IOException { try { - return getRsAdmin().openRegion(null, request); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); + return getRsAdmin().openRegion(request).get(); + } catch (InterruptedException e) { + throw (IOException) new InterruptedIOException().initCause(e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + Throwables.propagateIfPossible(cause, IOException.class); + throw new IOException(cause); } } @@ -453,9 +459,13 @@ public class RSProcedureDispatcher private CloseRegionResponse sendRequest(final ServerName serverName, final CloseRegionRequest request) throws IOException { try { - return getRsAdmin().closeRegion(null, request); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); + return getRsAdmin().closeRegion(request).get(); + } catch (InterruptedException e) { + throw (IOException) new InterruptedIOException().initCause(e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + Throwables.propagateIfPossible(cause, IOException.class); + throw new IOException(cause); } }