This is an automated email from the ASF dual-hosted git repository. sunxin pushed a commit to branch HBASE-24666 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 1f11ee4ad71b032555b2e46f56d744ccf621fa61 Author: XinSun <ddu...@gmail.com> AuthorDate: Wed Sep 9 15:00:37 2020 +0800 HBASE-24982 Disassemble the method replicateWALEntry from AdminService to a new interface ReplicationServerService (#2360) Signed-off-by: Wellington Chevreuil <wchevre...@apache.org> --- .../hadoop/hbase/client/AsyncConnectionImpl.java | 16 ++ .../server/replication/ReplicationServer.proto | 32 ++++ .../hadoop/hbase/replication/ReplicationUtils.java | 19 ++ .../hbase/client/AsyncClusterConnection.java | 5 + .../hbase/client/AsyncClusterConnectionImpl.java | 5 + .../hbase/client/AsyncReplicationServerAdmin.java | 80 +++++++++ .../hbase/protobuf/ReplicationProtobufUtil.java | 18 ++ .../hadoop/hbase/regionserver/RSRpcServices.java | 8 +- .../replication/HBaseReplicationEndpoint.java | 57 +++++- .../replication/ReplicationServerRpcServices.java | 200 +-------------------- .../HBaseInterClusterReplicationEndpoint.java | 7 +- .../hbase/client/DummyAsyncClusterConnection.java | 5 + .../replication/TestHBaseReplicationEndpoint.java | 17 +- .../hbase/replication/TestReplicationServer.java | 43 ++++- 14 files changed, 288 insertions(+), 224 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 8a1ac5a..5a332d8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminServic import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService; /** * The implementation of AsyncConnection. @@ -101,6 +102,8 @@ class AsyncConnectionImpl implements AsyncConnection { private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>(); private final ConcurrentMap<String, AdminService.Interface> adminSubs = new ConcurrentHashMap<>(); + private final ConcurrentMap<String, ReplicationServerService.Interface> replStubs = + new ConcurrentHashMap<>(); private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>(); @@ -278,12 +281,25 @@ class AsyncConnectionImpl implements AsyncConnection { return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); } + private ReplicationServerService.Interface createReplicationServerStub(ServerName serverName) + throws IOException { + return ReplicationServerService.newStub( + rpcClient.createRpcChannel(serverName, user, rpcTimeout)); + } + AdminService.Interface getAdminStub(ServerName serverName) throws IOException { return ConcurrentMapUtils.computeIfAbsentEx(adminSubs, getStubKey(AdminService.getDescriptor().getName(), serverName), () -> createAdminServerStub(serverName)); } + ReplicationServerService.Interface getReplicationServerStub(ServerName serverName) + throws IOException { + return ConcurrentMapUtils.computeIfAbsentEx(replStubs, + getStubKey(ReplicationServerService.Interface.class.getSimpleName(), serverName, + hostnameCanChange), () -> createReplicationServerStub(serverName)); + } + CompletableFuture<MasterService.Interface> getMasterStub() { return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> { CompletableFuture<MasterService.Interface> future = new CompletableFuture<>(); diff --git a/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto b/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto new file mode 100644 index 0000000..ed334c4 --- /dev/null +++ b/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto2"; +package hbase.pb; + +option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated"; +option java_outer_classname = "ReplicationServerProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + +import "server/region/Admin.proto"; + +service ReplicationServerService { + rpc ReplicateWALEntry(ReplicateWALEntryRequest) + returns(ReplicateWALEntryResponse); +} \ No newline at end of file diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java index a786206..7bafbc2 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java @@ -30,6 +30,9 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.AsyncAdmin; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -212,4 +215,20 @@ public final class ReplicationUtils { } return initialValue * HConstants.RETRY_BACKOFF[ntries]; } + + /** + * Check whether peer cluster supports replication offload. + * @param peerConn connection for peer cluster + * @return true if peer cluster version >= 3 + * @throws IOException exception + */ + public static boolean isPeerClusterSupportReplicationOffload(AsyncConnection peerConn) + throws IOException { + AsyncAdmin admin = peerConn.getAdmin(); + String version = FutureUtils.get(admin.getClusterMetrics()).getHBaseVersion(); + if (Integer.parseInt(version.split("\\.")[0]) >= 3) { + return true; + } + return false; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java index 92118ac..b6a3b97 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java @@ -42,6 +42,11 @@ public interface AsyncClusterConnection extends AsyncConnection { AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName); /** + * Get the admin service for the give replication server. + */ + AsyncReplicationServerAdmin getReplicationServerAdmin(ServerName serverName); + + /** * Get the nonce generator for this connection. */ NonceGenerator getNonceGenerator(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java index 39fc3a2..e4c2ee3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java @@ -71,6 +71,11 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClu } @Override + public AsyncReplicationServerAdmin getReplicationServerAdmin(ServerName serverName) { + return new AsyncReplicationServerAdmin(serverName, this); + } + + @Override public CompletableFuture<FlushRegionResponse> flush(byte[] regionName, boolean writeFlushWALMarker) { RawAsyncHBaseAdmin admin = (RawAsyncHBaseAdmin) getAdmin(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncReplicationServerAdmin.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncReplicationServerAdmin.java new file mode 100644 index 0000000..7511a64 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncReplicationServerAdmin.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService; + +/** + * A simple wrapper of the {@link ReplicationServerService} for a replication server. + * <p/> + * Notice that there is no retry, and this is intentional. + */ +@InterfaceAudience.Private +public class AsyncReplicationServerAdmin { + + private final ServerName server; + + private final AsyncConnectionImpl conn; + + AsyncReplicationServerAdmin(ServerName server, AsyncConnectionImpl conn) { + this.server = server; + this.conn = conn; + } + + @FunctionalInterface + private interface RpcCall<RESP> { + void call(ReplicationServerService.Interface stub, HBaseRpcController controller, + RpcCallback<RESP> done); + } + + private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall, CellScanner cellScanner) { + CompletableFuture<RESP> future = new CompletableFuture<>(); + HBaseRpcController controller = conn.rpcControllerFactory.newController(cellScanner); + try { + rpcCall.call(conn.getReplicationServerStub(server), controller, resp -> { + if (controller.failed()) { + future.completeExceptionally(controller.getFailed()); + } else { + future.complete(resp); + } + }); + } catch (IOException e) { + future.completeExceptionally(e); + } + return future; + } + + public CompletableFuture<AdminProtos.ReplicateWALEntryResponse> replicateWALEntry( + AdminProtos.ReplicateWALEntryRequest request, CellScanner cellScanner, int timeout) { + return call((stub, controller, done) -> { + controller.setCallTimeout(timeout); + stub.replicateWALEntry(controller, request, done); + }, cellScanner); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java index e47c929..17f48a6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; +import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin; import org.apache.hadoop.hbase.io.SizedCellScanner; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.hadoop.hbase.util.FutureUtils; @@ -62,6 +63,23 @@ public class ReplicationProtobufUtil { } /** + * A helper to replicate a list of WAL entries using replication server admin + * @param admin the replication server admin + * @param entries Array of WAL entries to be replicated + * @param replicationClusterId Id which will uniquely identify source cluster FS client + * configurations in the replication configuration directory + * @param sourceBaseNamespaceDir Path to source cluster base namespace directory + * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory + */ + public static void replicateWALEntry(AsyncReplicationServerAdmin admin, Entry[] entries, + String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, + int timeout) throws IOException { + Pair<ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(entries, null, + replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir); + FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout)); + } + + /** * Create a new ReplicateWALEntryRequest from a list of WAL entries * @param entries the WAL entries to be replicated * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values found. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 54395c3..a4a735b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -257,6 +257,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuo import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService; import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; @@ -1566,8 +1567,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } if (admin) { bssi.add(new BlockingServiceAndInterface( - AdminService.newReflectiveBlockingService(this), - AdminService.BlockingInterface.class)); + AdminService.newReflectiveBlockingService(this), + AdminService.BlockingInterface.class)); + bssi.add(new BlockingServiceAndInterface( + ReplicationServerService.newReflectiveBlockingService(this), + ReplicationServerService.BlockingInterface.class)); } return new org.apache.hbase.thirdparty.com.google.common.collect. ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java index 8678685..f38fd08 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java @@ -26,11 +26,15 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.AsyncClusterConnection; import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; +import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin; import org.apache.hadoop.hbase.client.ClusterConnectionFactory; +import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.zookeeper.ZKListener; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.ServerName; @@ -278,7 +282,7 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint } ServerName serverName = sinkServers.get(ThreadLocalRandom.current().nextInt(sinkServers.size())); - return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName)); + return createSinkPeer(serverName); } /** @@ -340,21 +344,60 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint /** * Wraps a replication region server sink to provide the ability to identify it. */ - public static class SinkPeer { + public static abstract class SinkPeer { private ServerName serverName; - private AsyncRegionServerAdmin regionServer; - public SinkPeer(ServerName serverName, AsyncRegionServerAdmin regionServer) { + public SinkPeer(ServerName serverName) { this.serverName = serverName; - this.regionServer = regionServer; } ServerName getServerName() { return serverName; } - public AsyncRegionServerAdmin getRegionServer() { - return regionServer; + public abstract void replicateWALEntry(WAL.Entry[] entries, String replicationClusterId, + Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) throws IOException; + } + + public static class RegionServerSinkPeer extends SinkPeer { + + private AsyncRegionServerAdmin regionServer; + + public RegionServerSinkPeer(ServerName serverName, + AsyncRegionServerAdmin replicationServer) { + super(serverName); + this.regionServer = replicationServer; + } + + public void replicateWALEntry(WAL.Entry[] entries, String replicationClusterId, + Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) throws IOException { + ReplicationProtobufUtil.replicateWALEntry(regionServer, entries, replicationClusterId, + sourceBaseNamespaceDir, sourceHFileArchiveDir, timeout); + } + } + + public static class ReplicationServerSinkPeer extends SinkPeer { + + private AsyncReplicationServerAdmin replicationServer; + + public ReplicationServerSinkPeer(ServerName serverName, + AsyncReplicationServerAdmin replicationServer) { + super(serverName); + this.replicationServer = replicationServer; + } + + public void replicateWALEntry(WAL.Entry[] entries, String replicationClusterId, + Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) throws IOException { + ReplicationProtobufUtil.replicateWALEntry(replicationServer, entries, replicationClusterId, + sourceBaseNamespaceDir, sourceHFileArchiveDir, timeout); + } + } + + private SinkPeer createSinkPeer(ServerName serverName) throws IOException { + if (ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) { + return new ReplicationServerSinkPeer(serverName, conn.getReplicationServerAdmin(serverName)); + } else { + return new RegionServerSinkPeer(serverName, conn.getRegionServerAdmin(serverName)); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java index 1b9b699..15d4f8c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java @@ -27,14 +27,12 @@ import java.util.concurrent.atomic.LongAdder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.PriorityFunction; -import org.apache.hadoop.hbase.ipc.QosPriority; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.RpcServerFactory; import org.apache.hadoop.hbase.ipc.RpcServerInterface; @@ -58,53 +56,11 @@ import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hbase.thirdparty.com.google.protobuf.Message; @@ -117,7 +73,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; @InterfaceAudience.Private @SuppressWarnings("deprecation") public class ReplicationServerRpcServices implements HBaseRPCErrorHandler, - AdminService.BlockingInterface, PriorityFunction { + ReplicationServerService.BlockingInterface, PriorityFunction { protected static final Logger LOG = LoggerFactory.getLogger(ReplicationServerRpcServices.class); @@ -256,8 +212,8 @@ public class ReplicationServerRpcServices implements HBaseRPCErrorHandler, protected List<BlockingServiceAndInterface> getServices() { List<BlockingServiceAndInterface> bssi = new ArrayList<>(); bssi.add(new BlockingServiceAndInterface( - AdminService.newReflectiveBlockingService(this), - AdminService.BlockingInterface.class)); + ReplicationServerService.newReflectiveBlockingService(this), + ReplicationServerService.BlockingInterface.class)); return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build(); } @@ -325,154 +281,6 @@ public class ReplicationServerRpcServices implements HBaseRPCErrorHandler, } } - @Override - public GetRegionInfoResponse getRegionInfo(RpcController controller, GetRegionInfoRequest request) - throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public GetStoreFileResponse getStoreFile(RpcController controller, GetStoreFileRequest request) - throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public GetOnlineRegionResponse getOnlineRegion(RpcController controller, - GetOnlineRegionRequest request) throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public OpenRegionResponse openRegion(RpcController controller, OpenRegionRequest request) - throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public WarmupRegionResponse warmupRegion(RpcController controller, WarmupRegionRequest request) - throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public CloseRegionResponse closeRegion(RpcController controller, CloseRegionRequest request) - throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public FlushRegionResponse flushRegion(RpcController controller, FlushRegionRequest request) - throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public CompactionSwitchResponse compactionSwitch(RpcController controller, - CompactionSwitchRequest request) throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public CompactRegionResponse compactRegion(RpcController controller, - CompactRegionRequest request) throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public ReplicateWALEntryResponse replay(RpcController controller, - ReplicateWALEntryRequest request) throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public RollWALWriterResponse rollWALWriter(RpcController controller, RollWALWriterRequest request) - throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public GetServerInfoResponse getServerInfo(RpcController controller, GetServerInfoRequest request) - throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - /** - * Stop the replication server. - * - * @param controller the RPC controller - * @param request the request - */ - @Override - @QosPriority(priority=HConstants.ADMIN_QOS) - public StopServerResponse stopServer(final RpcController controller, - final StopServerRequest request) { - requestCount.increment(); - String reason = request.getReason(); - replicationServer.stop(reason); - return StopServerResponse.newBuilder().build(); - } - - @Override - public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, - UpdateFavoredNodesRequest request) throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public UpdateConfigurationResponse updateConfiguration(RpcController controller, - UpdateConfigurationRequest request) throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public GetRegionLoadResponse getRegionLoad(RpcController controller, - GetRegionLoadRequest request) throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller, - ClearCompactionQueuesRequest request) throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller, - ClearRegionBlockCacheRequest request) throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller, - GetSpaceQuotaSnapshotsRequest request) throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public ExecuteProceduresResponse executeProcedures(RpcController controller, - ExecuteProceduresRequest request) throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public SlowLogResponses getSlowLogResponses(RpcController controller, - SlowLogResponseRequest request) throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public SlowLogResponses getLargeLogResponses(RpcController controller, - SlowLogResponseRequest request) throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - - @Override - public ClearSlowLogResponses clearSlowLogsResponses(RpcController controller, - ClearSlowLogResponseRequest request) throws ServiceException { - throw new ServiceException(new UnsupportedOperationException("This's Replication Server")); - } - protected AccessChecker getAccessChecker() { return accessChecker; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index c77f74f..d8517b0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.ipc.RpcServer; -import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil; import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; @@ -561,11 +560,9 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId); } sinkPeer = getReplicationSink(); - AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer(); try { - ReplicationProtobufUtil.replicateWALEntry(rsAdmin, - entries.toArray(new Entry[entries.size()]), replicationClusterId, baseNamespaceDir, - hfileArchiveDir, timeout); + sinkPeer.replicateWALEntry(entries.toArray(new Entry[entries.size()]), replicationClusterId, + baseNamespaceDir, hfileArchiveDir, timeout); if (LOG.isTraceEnabled()) { LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java index 8755749..5af4086 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java @@ -109,6 +109,11 @@ public class DummyAsyncClusterConnection implements AsyncClusterConnection { } @Override + public AsyncReplicationServerAdmin getReplicationServerAdmin(ServerName serverName) { + return null; + } + + @Override public NonceGenerator getNonceGenerator() { return null; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java index 4160141..4182eaf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java @@ -28,7 +28,8 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.AsyncClusterConnection; -import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; +import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin; +import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.ReplicationServerSinkPeer; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.SinkPeer; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -99,7 +100,7 @@ public class TestHBaseReplicationEndpoint { // Sanity check assertEquals(1, endpoint.getNumSinks()); - SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class)); + SinkPeer sinkPeer = mockSinkPeer(serverNameA); endpoint.reportBadSink(sinkPeer); // Just reporting a bad sink once shouldn't have an effect assertEquals(1, endpoint.getNumSinks()); @@ -123,7 +124,7 @@ public class TestHBaseReplicationEndpoint { assertEquals(expected, endpoint.getNumSinks()); ServerName badSinkServer0 = endpoint.getSinkServers().get(0); - SinkPeer sinkPeer = new SinkPeer(badSinkServer0, mock(AsyncRegionServerAdmin.class)); + SinkPeer sinkPeer = mockSinkPeer(badSinkServer0); for (int i = 0; i <= HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) { endpoint.reportBadSink(sinkPeer); } @@ -133,7 +134,7 @@ public class TestHBaseReplicationEndpoint { // now try a sink that has some successes ServerName badSinkServer1 = endpoint.getSinkServers().get(0); - sinkPeer = new SinkPeer(badSinkServer1, mock(AsyncRegionServerAdmin.class)); + sinkPeer = mockSinkPeer(badSinkServer1); for (int i = 0; i < HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) { endpoint.reportBadSink(sinkPeer); } @@ -168,8 +169,8 @@ public class TestHBaseReplicationEndpoint { ServerName serverNameA = endpoint.getSinkServers().get(0); ServerName serverNameB = endpoint.getSinkServers().get(1); - SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class)); - SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class)); + SinkPeer sinkPeerA = mockSinkPeer(serverNameA); + SinkPeer sinkPeerB = mockSinkPeer(serverNameB); for (int i = 0; i <= HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) { endpoint.reportBadSink(sinkPeerA); @@ -207,4 +208,8 @@ public class TestHBaseReplicationEndpoint { return null; } } + + private SinkPeer mockSinkPeer(ServerName serverName) { + return new ReplicationServerSinkPeer(serverName, mock(AsyncReplicationServerAdmin.class)); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java index 6a0ef3d..0ef23f2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java @@ -30,14 +30,15 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.AsyncClusterConnection; -import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; +import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; +import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.ReplicationServerSinkPeer; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; @@ -116,22 +117,48 @@ public class TestReplicationServer { TEST_UTIL.deleteTableIfAny(TABLENAME); } + /** + * Requests replication server using {@link AsyncReplicationServerAdmin} + */ @Test public void testReplicateWAL() throws Exception { - AsyncClusterConnection conn = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().get(0) - .getRegionServer().getAsyncClusterConnection(); - AsyncRegionServerAdmin rsAdmin = conn.getRegionServerAdmin(replicationServer.getServerName()); + AsyncClusterConnection conn = + TEST_UTIL.getHBaseCluster().getMaster().getAsyncClusterConnection(); + AsyncReplicationServerAdmin replAdmin = + conn.getReplicationServerAdmin(replicationServer.getServerName()); + + ReplicationServerSinkPeer sinkPeer = + new ReplicationServerSinkPeer(replicationServer.getServerName(), replAdmin); + replicateWALEntryAndVerify(sinkPeer); + } + + /** + * Requests region server using {@link AsyncReplicationServerAdmin} + */ + @Test + public void testReplicateWAL2() throws Exception { + AsyncClusterConnection conn = + TEST_UTIL.getHBaseCluster().getMaster().getAsyncClusterConnection(); + ServerName rs = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().get(0) + .getRegionServer().getServerName(); + AsyncReplicationServerAdmin replAdmin = conn.getReplicationServerAdmin(rs); + + ReplicationServerSinkPeer + sinkPeer = new ReplicationServerSinkPeer(rs, replAdmin); + replicateWALEntryAndVerify(sinkPeer); + } + private void replicateWALEntryAndVerify(ReplicationServerSinkPeer sinkPeer) throws Exception { Entry[] entries = new Entry[BATCH_SIZE]; for(int i = 0; i < BATCH_SIZE; i++) { entries[i] = generateEdit(i, TABLENAME, Bytes.toBytes(i)); } - ReplicationProtbufUtil.replicateWALEntry(rsAdmin, entries, replicationClusterId, - baseNamespaceDir, hfileArchiveDir, 1000); + sinkPeer.replicateWALEntry(entries, replicationClusterId, baseNamespaceDir, hfileArchiveDir, + 1000); + Table table = TEST_UTIL.getConnection().getTable(TABLENAME); for (int i = 0; i < BATCH_SIZE; i++) { - Table table = TEST_UTIL.getConnection().getTable(TABLENAME); Result result = table.get(new Get(Bytes.toBytes(i))); Cell cell = result.getColumnLatestCell(Bytes.toBytes(FAMILY), Bytes.toBytes(FAMILY)); assertNotNull(cell);