This is an automated email from the ASF dual-hosted git repository. sunxin pushed a commit to branch HBASE-24666 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit bd13d14bb59e5c4d1c69c19316f4bceb54ef7703 Author: XinSun <ddu...@gmail.com> AuthorDate: Wed Oct 28 18:59:57 2020 +0800 HBASE-24999 Master manages ReplicationServers (#2579) Signed-off-by: Guanghao Zhang <zg...@apache.org> --- .../server/master/ReplicationServerStatus.proto | 34 ++++ .../org/apache/hadoop/hbase/master/HMaster.java | 10 + .../hadoop/hbase/master/MasterRpcServices.java | 37 +++- .../apache/hadoop/hbase/master/MasterServices.java | 5 + .../hbase/master/ReplicationServerManager.java | 204 ++++++++++++++++++++ .../replication/HBaseReplicationEndpoint.java | 148 ++++++-------- .../hbase/replication/HReplicationServer.java | 214 ++++++++++++++++++++- .../HBaseInterClusterReplicationEndpoint.java | 1 - .../regionserver/ReplicationSyncUp.java | 4 +- .../hbase/master/MockNoopMasterServices.java | 5 + .../hbase/replication/TestReplicationBase.java | 2 + .../hbase/replication/TestReplicationServer.java | 57 +++++- 12 files changed, 619 insertions(+), 102 deletions(-) diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/ReplicationServerStatus.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/ReplicationServerStatus.proto new file mode 100644 index 0000000..d39a043 --- /dev/null +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/ReplicationServerStatus.proto @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto2"; + +package hbase.pb; + +option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated"; +option java_outer_classname = "ReplicationServerStatusProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + +import "server/master/RegionServerStatus.proto"; + +service ReplicationServerStatusService { + + rpc ReplicationServerReport(RegionServerReportRequest) + returns(RegionServerReportResponse); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 138a43f..5e8de56 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -303,6 +303,8 @@ public class HMaster extends HRegionServer implements MasterServices { // manager of assignment nodes in zookeeper private AssignmentManager assignmentManager; + // server manager to deal with replication server info + private ReplicationServerManager replicationServerManager; /** * Cache for the meta region replica's locations. Also tracks their changes to avoid stale @@ -866,6 +868,8 @@ public class HMaster extends HRegionServer implements MasterServices { .collect(Collectors.toList()); this.assignmentManager.setupRIT(ritList); + this.replicationServerManager = new ReplicationServerManager(this); + // Start RegionServerTracker with listing of servers found with exiting SCPs -- these should // be registered in the deadServers set -- and with the list of servernames out on the // filesystem that COULD BE 'alive' (we'll schedule SCPs for each and let SCP figure it out). @@ -1024,6 +1028,7 @@ public class HMaster extends HRegionServer implements MasterServices { this.hbckChore = new HbckChore(this); getChoreService().scheduleChore(hbckChore); this.serverManager.startChore(); + this.replicationServerManager.startChore(); // Only for rolling upgrade, where we need to migrate the data in namespace table to meta table. if (!waitForNamespaceOnline()) { @@ -1283,6 +1288,11 @@ public class HMaster extends HRegionServer implements MasterServices { } @Override + public ReplicationServerManager getReplicationServerManager() { + return this.replicationServerManager; + } + + @Override public MasterFileSystem getMasterFileSystem() { return this.fileSystemManager; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index df5f0b0..93f368f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -401,6 +401,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Trans import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerStatusProtos.ReplicationServerStatusService; import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; import org.apache.hadoop.hbase.shaded.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsService; @@ -412,7 +413,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.VisibilityLabelsProtos. public class MasterRpcServices extends RSRpcServices implements MasterService.BlockingInterface, RegionServerStatusService.BlockingInterface, LockService.BlockingInterface, HbckService.BlockingInterface, - ClientMetaService.BlockingInterface { + ClientMetaService.BlockingInterface, ReplicationServerStatusService.BlockingInterface { private static final Logger LOG = LoggerFactory.getLogger(MasterRpcServices.class.getName()); private static final Logger AUDITLOG = @@ -546,7 +547,7 @@ public class MasterRpcServices extends RSRpcServices implements */ @Override protected List<BlockingServiceAndInterface> getServices() { - List<BlockingServiceAndInterface> bssi = new ArrayList<>(5); + List<BlockingServiceAndInterface> bssi = new ArrayList<>(6); bssi.add(new BlockingServiceAndInterface( MasterService.newReflectiveBlockingService(this), MasterService.BlockingInterface.class)); @@ -559,6 +560,9 @@ public class MasterRpcServices extends RSRpcServices implements HbckService.BlockingInterface.class)); bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this), ClientMetaService.BlockingInterface.class)); + bssi.add(new BlockingServiceAndInterface( + ReplicationServerStatusService.newReflectiveBlockingService(this), + ReplicationServerStatusService.BlockingInterface.class)); bssi.addAll(super.getServices()); return bssi; } @@ -3414,4 +3418,33 @@ public class MasterRpcServices extends RSRpcServices implements } return builder.build(); } + + @Override + public RegionServerReportResponse replicationServerReport(RpcController controller, + RegionServerReportRequest request) throws ServiceException { + try { + master.checkServiceStarted(); + int versionNumber = 0; + String version = "0.0.0"; + VersionInfo versionInfo = VersionInfoUtil.getCurrentClientVersionInfo(); + if (versionInfo != null) { + version = versionInfo.getVersion(); + versionNumber = VersionInfoUtil.getVersionNumber(versionInfo); + } + ClusterStatusProtos.ServerLoad sl = request.getLoad(); + ServerName serverName = ProtobufUtil.toServerName(request.getServer()); + ServerMetrics oldMetrics = master.getReplicationServerManager().getServerMetrics(serverName); + ServerMetrics newMetrics = + ServerMetricsBuilder.toServerMetrics(serverName, versionNumber, version, sl); + master.getReplicationServerManager().serverReport(serverName, newMetrics); + if (sl != null && master.metricsMaster != null) { + // Up our metrics. + master.metricsMaster.incrementRequests(sl.getTotalNumberOfRequests() + - (oldMetrics != null ? oldMetrics.getRequestCount() : 0)); + } + } catch (IOException ioe) { + throw new ServiceException(ioe); + } + return RegionServerReportResponse.newBuilder().build(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 3f7dc02..bb8fdca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -103,6 +103,11 @@ public interface MasterServices extends Server { ServerManager getServerManager(); /** + * @return Master's {@link ReplicationServerManager} instance. + */ + ReplicationServerManager getReplicationServerManager(); + + /** * @return Master's instance of {@link ExecutorService} */ ExecutorService getExecutorService(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ReplicationServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ReplicationServerManager.java new file mode 100644 index 0000000..273b7f2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ReplicationServerManager.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.ServerMetrics; +import org.apache.hadoop.hbase.ServerName; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The ReplicationServerManager class manages info about replication servers. + * <p> + * Maintains lists of online and dead servers. + * <p> + * Servers are distinguished in two different ways. A given server has a + * location, specified by hostname and port, and of which there can only be one + * online at any given time. A server instance is specified by the location + * (hostname and port) as well as the startcode (timestamp from when the server + * was started). This is used to differentiate a restarted instance of a given + * server from the original instance. + */ +@InterfaceAudience.Private +public class ReplicationServerManager { + + private static final Logger LOG = LoggerFactory.getLogger(ReplicationServerManager.class); + + public static final String ONLINE_SERVER_REFRESH_INTERVAL = + "hbase.master.replication.server.refresh.interval"; + public static final int ONLINE_SERVER_REFRESH_INTERVAL_DEFAULT = 60 * 1000; // 1 mins + + private final MasterServices master; + + /** Map of registered servers to their current load */ + private final ConcurrentNavigableMap<ServerName, ServerMetrics> onlineServers = + new ConcurrentSkipListMap<>(); + + private OnlineServerRefresher onlineServerRefresher; + private int refreshPeriod; + + /** + * Constructor. + */ + public ReplicationServerManager(final MasterServices master) { + this.master = master; + } + + /** + * start chore in ServerManager + */ + public void startChore() { + Configuration conf = master.getConfiguration(); + refreshPeriod = conf.getInt(ONLINE_SERVER_REFRESH_INTERVAL, + ONLINE_SERVER_REFRESH_INTERVAL_DEFAULT); + onlineServerRefresher = new OnlineServerRefresher("ReplicationServerRefresher", refreshPeriod); + master.getChoreService().scheduleChore(onlineServerRefresher); + } + + /** + * Stop the ServerManager. + */ + public void stop() { + if (onlineServerRefresher != null) { + onlineServerRefresher.cancel(); + } + } + + public void serverReport(ServerName sn, ServerMetrics sl) { + if (null == this.onlineServers.replace(sn, sl)) { + if (!checkAndRecordNewServer(sn, sl)) { + LOG.info("ReplicationServerReport ignored, could not record the server: {}", sn); + } + } + } + + /** + * Check is a server of same host and port already exists, + * if not, or the existed one got a smaller start code, record it. + * + * @param serverName the server to check and record + * @param sl the server load on the server + * @return true if the server is recorded, otherwise, false + */ + private boolean checkAndRecordNewServer(final ServerName serverName, final ServerMetrics sl) { + ServerName existingServer = null; + synchronized (this.onlineServers) { + existingServer = findServerWithSameHostnamePort(serverName); + if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) { + LOG.info("ReplicationServer serverName={} rejected; we already have {} registered with " + + "same hostname and port", serverName, existingServer); + return false; + } + recordNewServer(serverName, sl); + // Note that we assume that same ts means same server, and don't expire in that case. + if (existingServer != null && (existingServer.getStartcode() < serverName.getStartcode())) { + LOG.info("Triggering server recovery; existingServer {} looks stale, new server: {}", + existingServer, serverName); + expireServer(existingServer); + } + } + return true; + } + + /** + * Assumes onlineServers is locked. + * @return ServerName with matching hostname and port. + */ + private ServerName findServerWithSameHostnamePort(final ServerName serverName) { + ServerName end = ServerName.valueOf(serverName.getHostname(), serverName.getPort(), + Long.MAX_VALUE); + + ServerName r = onlineServers.lowerKey(end); + if (r != null && ServerName.isSameAddress(r, serverName)) { + return r; + } + return null; + } + + /** + * Assumes onlineServers is locked. + */ + private void recordNewServer(final ServerName serverName, final ServerMetrics sl) { + LOG.info("Registering ReplicationServer={}", serverName); + this.onlineServers.put(serverName, sl); + } + + /** + * Assumes onlineServers is locked. + * Expire the passed server. Remove it from list of online servers + */ + public void expireServer(final ServerName serverName) { + LOG.info("Expiring ReplicationServer={}", serverName); + onlineServers.remove(serverName); + } + + /** + * @return Read-only map of servers to serverinfo + */ + public Map<ServerName, ServerMetrics> getOnlineServers() { + // Presumption is that iterating the returned Map is OK. + synchronized (this.onlineServers) { + return Collections.unmodifiableMap(this.onlineServers); + } + } + + /** + * @return A copy of the internal list of online servers. + */ + public List<ServerName> getOnlineServersList() { + return new ArrayList<>(this.onlineServers.keySet()); + } + + /** + * @param serverName server name + * @return ServerMetrics if serverName is known else null + */ + public ServerMetrics getServerMetrics(final ServerName serverName) { + return this.onlineServers.get(serverName); + } + + private class OnlineServerRefresher extends ScheduledChore { + + public OnlineServerRefresher(String name, int p) { + super(name, master, p, 60 * 1000); // delay one minute before first execute + } + + @Override + protected void chore() { + synchronized (onlineServers) { + List<ServerName> servers = getOnlineServersList(); + servers.forEach(s -> { + ServerMetrics metrics = onlineServers.get(s); + if (metrics.getReportTimestamp() + refreshPeriod < System.currentTimeMillis()) { + expireServer(s); + } + }); + } + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java index e788d8c..115df76 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java @@ -18,8 +18,8 @@ package org.apache.hadoop.hbase.replication; -import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT; -import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT; import java.io.IOException; import java.util.ArrayList; @@ -32,16 +32,15 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.AsyncClusterConnection; import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin; import org.apache.hadoop.hbase.client.ClusterConnectionFactory; import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil; -import org.apache.hadoop.hbase.ScheduledChore; -import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; @@ -67,6 +66,13 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterServ /** * A {@link BaseReplicationEndpoint} for replication endpoints whose * target cluster is an HBase cluster. + * <p> + * Compatible with two implementations to fetch sink servers, fetching replication servers by + * accessing master and fetching region servers by listening to ZK. + * Give priority to fetch replication servers as sink servers by accessing master. if slave cluster + * isn't supported(version < 3.x) or exceptions occur, fetch region servers as sink servers via ZK. + * So we always register ZK listener, but ignored the ZK event if replication servers are available. + * </p> */ @InterfaceAudience.Private public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint @@ -74,9 +80,6 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class); - public static final String FETCH_SERVERS_USE_ZK_CONF_KEY = - "hbase.replication.fetch.servers.usezk"; - public static final String FETCH_SERVERS_INTERVAL_CONF_KEY = "hbase.replication.fetch.servers.interval"; public static final int DEFAULT_FETCH_SERVERS_INTERVAL = 10 * 60 * 1000; // 10 mins @@ -112,10 +115,9 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint private List<ServerName> sinkServers = new ArrayList<>(0); - private AsyncClusterConnection peerConnection; - private boolean fetchServersUseZk = false; + private volatile boolean fetchServersUseZk = false; private FetchServersChore fetchServersChore; - private int shortOperationTimeout; + private int operationTimeout; /* * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different @@ -136,6 +138,8 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint this.badSinkThreshold = ctx.getConfiguration().getInt("replication.bad.sink.threshold", DEFAULT_BAD_SINK_THRESHOLD); this.badReportCounts = Maps.newHashMap(); + this.operationTimeout = ctx.getLocalConfiguration().getInt( + HBASE_CLIENT_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); } protected void disconnect() { @@ -144,20 +148,12 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint zkw.close(); } } - if (this.conn != null) { - try { - this.conn.close(); - this.conn = null; - } catch (IOException e) { - LOG.warn("{} Failed to close the connection", ctx.getPeerId()); - } - } if (fetchServersChore != null) { fetchServersChore.cancel(); } - if (peerConnection != null) { + if (conn != null) { try { - peerConnection.close(); + conn.close(); } catch (IOException e) { LOG.warn("Attempt to close peerConnection failed.", e); } @@ -192,27 +188,10 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint } @Override - protected synchronized void doStart() { - this.shortOperationTimeout = ctx.getLocalConfiguration().getInt( - HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT); + protected void doStart() { try { - if (ctx.getLocalConfiguration().getBoolean(FETCH_SERVERS_USE_ZK_CONF_KEY, false)) { - fetchServersUseZk = true; - } else { - try { - if (ReplicationUtils.isPeerClusterSupportReplicationOffload(getPeerConnection())) { - fetchServersChore = new FetchServersChore(ctx.getServer(), this); - ctx.getServer().getChoreService().scheduleChore(fetchServersChore); - fetchServersUseZk = false; - } else { - fetchServersUseZk = true; - } - } catch (Throwable t) { - fetchServersUseZk = true; - LOG.warn("Peer {} try to fetch servers by admin failed. Using zk impl.", - ctx.getPeerId(), t); - } - } + fetchServersChore = new FetchServersChore(ctx.getServer(), this); + ctx.getServer().getChoreService().scheduleChore(fetchServersChore); reloadZkWatcher(); connectPeerCluster(); notifyStarted(); @@ -255,9 +234,7 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint } zkw = new ZKWatcher(ctx.getConfiguration(), "connection to cluster: " + ctx.getPeerId(), this); - if (fetchServersUseZk) { - zkw.registerListener(new PeerRegionServerListener(this)); - } + zkw.registerListener(new PeerRegionServerListener(this)); } } @@ -283,38 +260,25 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint } /** - * Get the connection to peer cluster - * @return connection to peer cluster - * @throws IOException If anything goes wrong connecting - */ - private synchronized AsyncClusterConnection getPeerConnection() throws IOException { - if (peerConnection == null) { - Configuration conf = ctx.getConfiguration(); - peerConnection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null, - UserProvider.instantiate(conf).getCurrent()); - } - return peerConnection; - } - - /** * Get the list of all the servers that are responsible for replication sink * from the specified peer master - * @return list of server addresses or an empty list if the slave is unavailable + * @return list of server addresses */ - protected List<ServerName> fetchSlavesAddresses() { + protected List<ServerName> fetchSlavesAddresses() throws IOException { try { - AsyncClusterConnection peerConn = getPeerConnection(); - ServerName master = FutureUtils.get(peerConn.getAdmin().getMaster()); + ServerName master = FutureUtils.get(conn.getAdmin().getMaster()); MasterService.BlockingInterface masterStub = MasterService.newBlockingStub( - peerConn.getRpcClient() - .createBlockingRpcChannel(master, User.getCurrent(), shortOperationTimeout)); + conn.getRpcClient().createBlockingRpcChannel(master, User.getCurrent(), operationTimeout)); ListReplicationSinkServersResponse resp = masterStub .listReplicationSinkServers(null, ListReplicationSinkServersRequest.newBuilder().build()); return ProtobufUtil.toServerNameList(resp.getServerNameList()); - } catch (ServiceException | IOException e) { + } catch (ServiceException e) { LOG.error("Peer {} fetches servers failed", ctx.getPeerId(), e); + throw ProtobufUtil.getRemoteException(e); + } catch (IOException e) { + LOG.error("Peer {} fetches servers failed", ctx.getPeerId(), e); + throw e; } - return Collections.emptyList(); } /** @@ -344,20 +308,34 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint return addresses; } - protected synchronized void chooseSinks() { - List<ServerName> slaveAddresses = Collections.emptyList(); - if (fetchServersUseZk) { + protected void chooseSinks() { + List<ServerName> slaveAddresses = Collections.EMPTY_LIST; + boolean useZk = fetchServersUseZk; + try { + if (!useZk || ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) { + useZk = false; + slaveAddresses = fetchSlavesAddresses(); + } else { + useZk = true; + } + } catch (Throwable t) { + LOG.warn("Peer {} try to fetch servers by admin failed. Using zk impl.", ctx.getPeerId(), t); + useZk = true; + } + if (useZk) { slaveAddresses = fetchSlavesAddressesByZK(); - } else { - slaveAddresses = fetchSlavesAddresses(); } + if (slaveAddresses.isEmpty()) { LOG.warn("No sinks available at peer. Will not be able to replicate"); } Collections.shuffle(slaveAddresses, ThreadLocalRandom.current()); int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio); - this.sinkServers = slaveAddresses.subList(0, numSinks); - badReportCounts.clear(); + synchronized (this) { + this.fetchServersUseZk = useZk; + this.sinkServers = slaveAddresses.subList(0, numSinks); + badReportCounts.clear(); + } } protected synchronized int getNumSinks() { @@ -368,16 +346,18 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint * Get a randomly-chosen replication sink to replicate to. * @return a replication sink to replicate to */ - protected synchronized SinkPeer getReplicationSink() throws IOException { - if (sinkServers.isEmpty()) { - LOG.info("Current list of sinks is out of date or empty, updating"); - chooseSinks(); - } - if (sinkServers.isEmpty()) { - throw new IOException("No replication sinks are available"); + protected SinkPeer getReplicationSink() throws IOException { + ServerName serverName; + synchronized (this) { + if (sinkServers.isEmpty()) { + LOG.info("Current list of sinks is out of date or empty, updating"); + chooseSinks(); + } + if (sinkServers.isEmpty()) { + throw new IOException("No replication sinks are available"); + } + serverName = sinkServers.get(ThreadLocalRandom.current().nextInt(sinkServers.size())); } - ServerName serverName = - sinkServers.get(ThreadLocalRandom.current().nextInt(sinkServers.size())); return createSinkPeer(serverName); } @@ -438,7 +418,7 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint @Override public synchronized void nodeChildrenChanged(String path) { - if (path.equals(regionServerListNode)) { + if (replicationEndpoint.fetchServersUseZk && path.equals(regionServerListNode)) { LOG.info("Detected change to peer region servers, fetching updated list"); replicationEndpoint.chooseSinks(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java index 31dec0c..4b53bb7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; +import java.lang.management.MemoryUsage; import java.net.InetSocketAddress; import java.util.concurrent.atomic.AtomicBoolean; @@ -28,22 +29,37 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.YouAreDeadException; import org.apache.hadoop.hbase.client.AsyncClusterConnection; import org.apache.hadoop.hbase.client.ClusterConnectionFactory; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.io.util.MemorySizeUtil; +import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.regionserver.ReplicationService; import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Sleeper; +import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerStatusProtos.ReplicationServerStatusService; + /** * HReplicationServer which is responsible to all replication stuff. It checks in with * the HMaster. There are many HReplicationServers in a single HBase deployment. @@ -86,9 +102,14 @@ public class HReplicationServer extends Thread implements Server { // A sleeper that sleeps for msgInterval. protected final Sleeper sleeper; + private final int shortOperationTimeout; + // zookeeper connection and watcher protected final ZKWatcher zooKeeper; + // master address tracker + private final MasterAddressTracker masterAddressTracker; + /** * The asynchronous cluster connection to be shared by services. */ @@ -98,6 +119,17 @@ public class HReplicationServer extends Thread implements Server { protected final ReplicationServerRpcServices rpcServices; + // Stub to do region server status calls against the master. + private volatile ReplicationServerStatusService.BlockingInterface rssStub; + + // RPC client. Used to make the stub above that does region server status checking. + private RpcClient rpcClient; + + /** + * ChoreService used to schedule tasks that we want to run periodically + */ + private ChoreService choreService; + public HReplicationServer(final Configuration conf) throws IOException { TraceUtil.initTracer(conf); try { @@ -116,16 +148,22 @@ public class HReplicationServer extends Thread implements Server { this.msgInterval = conf.getInt("hbase.replicationserver.msginterval", 3 * 1000); this.sleeper = new Sleeper(this.msgInterval, this); + this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT); + // Some unit tests don't need a cluster, so no zookeeper at all if (!conf.getBoolean("hbase.testing.nocluster", false)) { // Open connection to zookeeper and set primary watcher zooKeeper = new ZKWatcher(conf, getProcessName() + ":" + rpcServices.isa.getPort(), this, false); + masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this); + masterAddressTracker.start(); } else { zooKeeper = null; + masterAddressTracker = null; } - this.rpcServices.start(zooKeeper); + this.choreService = new ChoreService(getName(), true); } catch (Throwable t) { // Make sure we log the exception. HReplicationServer is often started via reflection and the // cause of failed startup is lost. @@ -150,6 +188,7 @@ public class HReplicationServer extends Thread implements Server { } catch (Throwable e) { abort("Fatal exception during initialization", e); } + try { setupReplication(); startReplicationService(); @@ -161,6 +200,7 @@ public class HReplicationServer extends Thread implements Server { while (!isStopped()) { long now = System.currentTimeMillis(); if ((now - lastMsg) >= msgInterval) { + tryReplicationServerReport(lastMsg, now); lastMsg = System.currentTimeMillis(); } if (!isStopped() && !isAborted()) { @@ -177,6 +217,22 @@ public class HReplicationServer extends Thread implements Server { abort(t.getMessage(), t); } + if (this.asyncClusterConnection != null) { + try { + this.asyncClusterConnection.close(); + } catch (IOException e) { + // Although the {@link Closeable} interface throws an {@link + // IOException}, in reality, the implementation would never do that. + LOG.warn("Attempt to close server's AsyncClusterConnection failed.", e); + } + } + if (rssStub != null) { + rssStub = null; + } + if (rpcClient != null) { + this.rpcClient.close(); + } + if (this.zooKeeper != null) { this.zooKeeper.close(); } @@ -204,11 +260,13 @@ public class HReplicationServer extends Thread implements Server { private void preRegistrationInitialization() { try { setupClusterConnection(); + // Setup RPC client for master communication + this.rpcClient = asyncClusterConnection.getRpcClient(); } catch (Throwable t) { // Call stop if error or process will stick around for ever since server // puts up non-daemon threads. this.rpcServices.stop(); - abort("Initialization of RS failed. Hence aborting RS.", t); + abort("Initialization of ReplicationServer failed. Hence aborting ReplicationServer.", t); } } @@ -272,7 +330,7 @@ public class HReplicationServer extends Thread implements Server { @Override public ChoreService getChoreService() { - return null; + return this.choreService; } @Override @@ -329,7 +387,7 @@ public class HReplicationServer extends Thread implements Server { throws IOException { // read in the name of the sink replication class from the config file. String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME, - HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); + HConstants.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT); server.replicationSinkService = newReplicationInstance(sinkClassname, ReplicationSinkService.class, conf, server); @@ -388,4 +446,152 @@ public class HReplicationServer extends Thread implements Server { protected boolean setAbortRequested() { return abortRequested.compareAndSet(false, true); } + + private void tryReplicationServerReport(long reportStartTime, long reportEndTime) + throws IOException { + ReplicationServerStatusService.BlockingInterface rss = rssStub; + if (rss == null) { + ServerName masterServerName = createReplicationServerStatusStub(true); + rss = rssStub; + if (masterServerName == null || rss == null) { + return; + } + } + ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime); + try { + RegionServerReportRequest.Builder request = RegionServerReportRequest + .newBuilder(); + request.setServer(ProtobufUtil.toServerName(this.serverName)); + request.setLoad(sl); + rss.replicationServerReport(null, request.build()); + } catch (ServiceException se) { + IOException ioe = ProtobufUtil.getRemoteException(se); + if (ioe instanceof YouAreDeadException) { + // This will be caught and handled as a fatal error in run() + throw ioe; + } + if (rssStub == rss) { + rssStub = null; + } + // Couldn't connect to the master, get location from zk and reconnect + // Method blocks until new master is found or we are stopped + createReplicationServerStatusStub(true); + } + } + + private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) { + long usedMemory = -1L; + long maxMemory = -1L; + final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage(); + if (usage != null) { + usedMemory = usage.getUsed(); + maxMemory = usage.getMax(); + } + + ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder(); + serverLoad.setTotalNumberOfRequests(rpcServices.requestCount.sum()); + serverLoad.setUsedHeapMB((int) (usedMemory / 1024 / 1024)); + serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024)); + + serverLoad.setReportStartTime(reportStartTime); + serverLoad.setReportEndTime(reportEndTime); + + // for the replicationLoad purpose. Only need to get from one executorService + // either source or sink will get the same info + ReplicationSinkService sinks = getReplicationSinkService(); + + if (sinks != null) { + // always refresh first to get the latest value + ReplicationLoad rLoad = sinks.refreshAndGetReplicationLoad(); + if (rLoad != null) { + serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink()); + } + } + return serverLoad.build(); + } + + /** + * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh + * connection, the current rssStub must be null. Method will block until a master is available. + * You can break from this block by requesting the server stop. + * @param refresh If true then master address will be read from ZK, otherwise use cached data + * @return master + port, or null if server has been stopped + */ + private synchronized ServerName createReplicationServerStatusStub(boolean refresh) { + if (rssStub != null) { + return masterAddressTracker.getMasterAddress(); + } + ServerName sn = null; + long previousLogTime = 0; + ReplicationServerStatusService.BlockingInterface intRssStub = null; + boolean interrupted = false; + try { + while (keepLooping()) { + sn = this.masterAddressTracker.getMasterAddress(refresh); + if (sn == null) { + if (!keepLooping()) { + // give up with no connection. + LOG.debug("No master found and cluster is stopped; bailing out"); + return null; + } + if (System.currentTimeMillis() > (previousLogTime + 1000)) { + LOG.debug("No master found; retry"); + previousLogTime = System.currentTimeMillis(); + } + refresh = true; // let's try pull it from ZK directly + if (sleepInterrupted(200)) { + interrupted = true; + } + continue; + } + + try { + BlockingRpcChannel channel = + this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(), + shortOperationTimeout); + intRssStub = ReplicationServerStatusService.newBlockingStub(channel); + break; + } catch (IOException e) { + if (System.currentTimeMillis() > (previousLogTime + 1000)) { + e = e instanceof RemoteException ? + ((RemoteException)e).unwrapRemoteException() : e; + if (e instanceof ServerNotRunningYetException) { + LOG.info("Master isn't available yet, retrying"); + } else { + LOG.warn("Unable to connect to master. Retrying. Error was:", e); + } + previousLogTime = System.currentTimeMillis(); + } + if (sleepInterrupted(200)) { + interrupted = true; + } + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + this.rssStub = intRssStub; + return sn; + } + + /** + * @return True if we should break loop because cluster is going down or + * this server has been stopped or hdfs has gone bad. + */ + private boolean keepLooping() { + return !this.stopped; + } + + private static boolean sleepInterrupted(long millis) { + boolean interrupted = false; + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + LOG.warn("Interrupted while sleeping"); + interrupted = true; + } + return interrupted; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index d8517b0..f0448b1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.ipc.RpcServer; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java index a43be29..55b20c6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java @@ -104,11 +104,13 @@ public class ReplicationSyncUp extends Configured implements Tool { class DummyServer implements Server { String hostname; ZKWatcher zkw; + ChoreService choreService; DummyServer(ZKWatcher zkw) { // a unique name in case the first run fails hostname = System.currentTimeMillis() + ".SyncUpTool.replication.org"; this.zkw = zkw; + this.choreService = new ChoreService("ReplicationSyncUpDummyServer", true); } DummyServer(String hostname) { @@ -160,7 +162,7 @@ public class ReplicationSyncUp extends Configured implements Tool { @Override public ChoreService getChoreService() { - return null; + return choreService; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index ed2edb0..52ca278 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -154,6 +154,11 @@ public class MockNoopMasterServices implements MasterServices { } @Override + public ReplicationServerManager getReplicationServerManager() { + return null; + } + + @Override public ZKWatcher getZooKeeper() { return null; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index eca0d67..78fec0e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.replication; +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -198,6 +199,7 @@ public class TestReplicationBase { conf.setFloat("replication.source.ratio", 1.0f); conf.setBoolean("replication.source.eof.autorecovery", true); conf.setLong("hbase.serial.replication.waiting.ms", 100); + conf.setLong(HBASE_CLIENT_OPERATION_TIMEOUT, 5000); } static void configureClusters(HBaseTestingUtility util1, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java index 0ef23f2..30660c6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java @@ -17,7 +17,10 @@ */ package org.apache.hadoop.hbase.replication; +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT; +import static org.apache.hadoop.hbase.master.ReplicationServerManager.ONLINE_SERVER_REFRESH_INTERVAL; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -38,7 +41,9 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.ReplicationServerManager; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.ReplicationServerSinkPeer; +import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.SinkPeer; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; @@ -74,6 +79,7 @@ public class TestReplicationServer { private static HMaster MASTER; private static HReplicationServer replicationServer; + private static ServerName replicationServerName; private static Path baseNamespaceDir; private static Path hfileArchiveDir; @@ -86,14 +92,11 @@ public class TestReplicationServer { @BeforeClass public static void beforeClass() throws Exception { + CONF.setLong(HBASE_CLIENT_OPERATION_TIMEOUT, 1000); + CONF.setLong(ONLINE_SERVER_REFRESH_INTERVAL, 10000); TEST_UTIL.startMiniCluster(); MASTER = TEST_UTIL.getMiniHBaseCluster().getMaster(); - - replicationServer = new HReplicationServer(CONF); - replicationServer.start(); - TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster(); - TEST_UTIL.waitFor(60000, () -> replicationServer.isOnline()); Path rootDir = CommonFSUtils.getRootDir(CONF); baseNamespaceDir = new Path(rootDir, new Path(HConstants.BASE_NAMESPACE_DIR)); @@ -108,6 +111,11 @@ public class TestReplicationServer { @Before public void before() throws Exception { + replicationServer = new HReplicationServer(CONF); + replicationServer.start(); + TEST_UTIL.waitFor(60000, () -> replicationServer.isOnline()); + replicationServerName = replicationServer.getServerName(); + TEST_UTIL.createTable(TABLENAME, FAMILY); TEST_UTIL.waitTableAvailable(TABLENAME); } @@ -115,6 +123,11 @@ public class TestReplicationServer { @After public void after() throws IOException { TEST_UTIL.deleteTableIfAny(TABLENAME); + if (!replicationServer.isStopped()) { + replicationServer.stop("test"); + } + replicationServer = null; + replicationServerName = null; } /** @@ -125,10 +138,10 @@ public class TestReplicationServer { AsyncClusterConnection conn = TEST_UTIL.getHBaseCluster().getMaster().getAsyncClusterConnection(); AsyncReplicationServerAdmin replAdmin = - conn.getReplicationServerAdmin(replicationServer.getServerName()); + conn.getReplicationServerAdmin(replicationServerName); ReplicationServerSinkPeer sinkPeer = - new ReplicationServerSinkPeer(replicationServer.getServerName(), replAdmin); + new ReplicationServerSinkPeer(replicationServerName, replAdmin); replicateWALEntryAndVerify(sinkPeer); } @@ -143,12 +156,11 @@ public class TestReplicationServer { .getRegionServer().getServerName(); AsyncReplicationServerAdmin replAdmin = conn.getReplicationServerAdmin(rs); - ReplicationServerSinkPeer - sinkPeer = new ReplicationServerSinkPeer(rs, replAdmin); + ReplicationServerSinkPeer sinkPeer = new ReplicationServerSinkPeer(rs, replAdmin); replicateWALEntryAndVerify(sinkPeer); } - private void replicateWALEntryAndVerify(ReplicationServerSinkPeer sinkPeer) throws Exception { + private void replicateWALEntryAndVerify(SinkPeer sinkPeer) throws Exception { Entry[] entries = new Entry[BATCH_SIZE]; for(int i = 0; i < BATCH_SIZE; i++) { entries[i] = generateEdit(i, TABLENAME, Bytes.toBytes(i)); @@ -175,4 +187,29 @@ public class TestReplicationServer { edit.add(new KeyValue(row, Bytes.toBytes(FAMILY), Bytes.toBytes(FAMILY), timestamp, row)); return new WAL.Entry(key, edit); } + + @Test + public void testReplicationServerReport() throws Exception { + ReplicationServerManager replicationServerManager = MASTER.getReplicationServerManager(); + assertNotNull(replicationServerManager); + TEST_UTIL.waitFor(60000, () -> !replicationServerManager.getOnlineServers().isEmpty() + && null != replicationServerManager.getServerMetrics(replicationServerName)); + // put data via replication server + testReplicateWAL(); + TEST_UTIL.waitFor(60000, () -> replicationServer.rpcServices.requestCount.sum() > 0 + && replicationServer.rpcServices.requestCount.sum() == replicationServerManager + .getServerMetrics(replicationServerName).getRequestCount()); + } + + @Test + public void testReplicationServerExpire() throws Exception { + ReplicationServerManager replicationServerManager = MASTER.getReplicationServerManager(); + TEST_UTIL.waitFor(60000, () -> !replicationServerManager.getOnlineServers().isEmpty() + && null != replicationServerManager.getServerMetrics(replicationServerName)); + + replicationServer.stop("test"); + + TEST_UTIL.waitFor(180000, 1000, replicationServerManager.getOnlineServers()::isEmpty); + assertNull(replicationServerManager.getServerMetrics(replicationServerName)); + } }