This is an automated email from the ASF dual-hosted git repository. sunxin pushed a commit to branch HBASE-24666 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit c8c85f4f4205ebcd6c7fbecf55072386c3bb842b Author: Guanghao Zhang <zg...@apache.org> AuthorDate: Mon Nov 9 11:46:02 2020 +0800 HBASE-25071 ReplicationServer support start ReplicationSource internal (#2452) Signed-off-by: XinSun <ddu...@gmail.com> --- .../server/replication/ReplicationServer.proto | 14 +- .../replication/ZKReplicationQueueStorage.java | 4 +- .../replication/ZKReplicationStorageBase.java | 4 + .../hadoop/hbase/master/MasterRpcServices.java | 2 +- .../hadoop/hbase/regionserver/RSRpcServices.java | 2 +- .../replication/HBaseReplicationEndpoint.java | 14 +- .../hbase/replication/HReplicationServer.java | 175 ++++++++++++++++++--- .../replication/ReplicationServerRpcServices.java | 15 ++ .../regionserver/RecoveredReplicationSource.java | 9 +- .../regionserver/ReplicationSource.java | 54 ++++++- .../regionserver/ReplicationSourceFactory.java | 2 +- .../regionserver/ReplicationSourceInterface.java | 6 +- .../regionserver/ReplicationSourceManager.java | 9 +- .../hbase/replication/ReplicationSourceDummy.java | 5 +- .../replication/TestReplicationFetchServers.java | 43 +++-- ...nServer.java => TestReplicationServerSink.java} | 25 +-- .../replication/TestReplicationServerSource.java | 69 ++++++++ .../regionserver/TestReplicationSource.java | 20 +-- .../regionserver/TestReplicationSourceManager.java | 18 ++- 19 files changed, 400 insertions(+), 90 deletions(-) diff --git a/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto b/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto index ed334c4..925aed4 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto @@ -24,9 +24,21 @@ option java_generic_services = true; option java_generate_equals_and_hash = true; option optimize_for = SPEED; +import "HBase.proto"; import "server/region/Admin.proto"; +message StartReplicationSourceRequest { + required ServerName server_name = 1; + required string queue_id = 2; +} + +message StartReplicationSourceResponse { +} + service ReplicationServerService { rpc ReplicateWALEntry(ReplicateWALEntryRequest) returns(ReplicateWALEntryResponse); -} \ No newline at end of file + + rpc StartReplicationSource(StartReplicationSourceRequest) + returns(StartReplicationSourceResponse); +} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java index 5c480ba..08ac142 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java @@ -79,7 +79,7 @@ import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUti * </pre> */ @InterfaceAudience.Private -class ZKReplicationQueueStorage extends ZKReplicationStorageBase +public class ZKReplicationQueueStorage extends ZKReplicationStorageBase implements ReplicationQueueStorage { private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationQueueStorage.class); @@ -121,7 +121,7 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase return ZNodePaths.joinZNode(queuesZNode, serverName.getServerName()); } - private String getQueueNode(ServerName serverName, String queueId) { + public String getQueueNode(ServerName serverName, String queueId) { return ZNodePaths.joinZNode(getRsNode(serverName), queueId); } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java index 596167f..a239bf8 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java @@ -74,4 +74,8 @@ public abstract class ZKReplicationStorageBase { throw new RuntimeException(e); } } + + public ZKWatcher getZookeeper() { + return this.zookeeper; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index c677458..c17d699 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -3475,7 +3475,7 @@ public class MasterRpcServices extends RSRpcServices implements if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().preListReplicationSinkServers(); } - builder.addAllServerName(master.listReplicationSinkServers().stream() + builder.addAllServerName(master.getReplicationServerManager().getOnlineServersList().stream() .map(ProtobufUtil::toServerName).collect(Collectors.toList())); if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().postListReplicationSinkServers(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 72fea23..91bf9cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -273,7 +273,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe @SuppressWarnings("deprecation") public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction, - ConfigurationObserver, ReplicationServerService.BlockingInterface { + ConfigurationObserver { private static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class); /** RPC scheduler to use for the region server. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java index 115df76..d17bb7f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java @@ -315,6 +315,10 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint if (!useZk || ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) { useZk = false; slaveAddresses = fetchSlavesAddresses(); + if (slaveAddresses.isEmpty()) { + LOG.warn("No sinks available at peer. Try fetch sinks by using zk."); + useZk = true; + } } else { useZk = true; } @@ -322,13 +326,15 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint LOG.warn("Peer {} try to fetch servers by admin failed. Using zk impl.", ctx.getPeerId(), t); useZk = true; } + if (useZk) { slaveAddresses = fetchSlavesAddressesByZK(); } if (slaveAddresses.isEmpty()) { - LOG.warn("No sinks available at peer. Will not be able to replicate"); + LOG.warn("No sinks available at peer. Will not be able to replicate."); } + Collections.shuffle(slaveAddresses, ThreadLocalRandom.current()); int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio); synchronized (this) { @@ -362,10 +368,10 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint } private SinkPeer createSinkPeer(ServerName serverName) throws IOException { - if (ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) { - return new ReplicationServerSinkPeer(serverName, conn.getReplicationServerAdmin(serverName)); - } else { + if (fetchServersUseZk) { return new RegionServerSinkPeer(serverName, conn.getRegionServerAdmin(serverName)); + } else { + return new ReplicationServerSinkPeer(serverName, conn.getReplicationServerAdmin(serverName)); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java index e679a98..2d0336d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java @@ -20,10 +20,19 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; import java.lang.management.MemoryUsage; import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.OptionalLong; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ChoreService; +import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; @@ -33,17 +42,30 @@ import org.apache.hadoop.hbase.YouAreDeadException; import org.apache.hadoop.hbase.client.AsyncClusterConnection; import org.apache.hadoop.hbase.client.ClusterConnectionFactory; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.regionserver.ReplicationService; import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; +import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource; +import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceFactory; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; +import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceFactory; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; +import org.apache.hadoop.hbase.security.SecurityConstants; +import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.Sleeper; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; +import org.apache.hadoop.hbase.zookeeper.ZKClusterId; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.ReflectionUtils; @@ -65,7 +87,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerStatus */ @InterfaceAudience.Private @SuppressWarnings({ "deprecation"}) -public class HReplicationServer extends Thread implements Server { +public class HReplicationServer extends Thread implements Server, ReplicationSourceController { private static final Logger LOG = LoggerFactory.getLogger(HReplicationServer.class); @@ -75,7 +97,7 @@ public class HReplicationServer extends Thread implements Server { /** * This servers start code. */ - protected final long startCode; + private final long startCode; private volatile boolean stopped = false; @@ -84,7 +106,11 @@ public class HReplicationServer extends Thread implements Server { private AtomicBoolean abortRequested; // flag set after we're done setting up server threads - final AtomicBoolean online = new AtomicBoolean(false); + private final AtomicBoolean online = new AtomicBoolean(false); + + private final int msgInterval; + // A sleeper that sleeps for msgInterval. + private final Sleeper sleeper; /** * The server name the Master sees us as. Its made from the hostname the @@ -93,18 +119,22 @@ public class HReplicationServer extends Thread implements Server { */ private ServerName serverName; - protected final Configuration conf; + private final Configuration conf; - private ReplicationSinkService replicationSinkService; + // zookeeper connection and watcher + private final ZKWatcher zooKeeper; - final int msgInterval; - // A sleeper that sleeps for msgInterval. - protected final Sleeper sleeper; + private final UUID clusterId; private final int shortOperationTimeout; - // zookeeper connection and watcher - protected final ZKWatcher zooKeeper; + private HFileSystem walFs; + private Path walRootDir; + + /** + * ChoreService used to schedule tasks that we want to run periodically + */ + private ChoreService choreService; // master address tracker private final MasterAddressTracker masterAddressTracker; @@ -112,11 +142,23 @@ public class HReplicationServer extends Thread implements Server { /** * The asynchronous cluster connection to be shared by services. */ - protected AsyncClusterConnection asyncClusterConnection; + private AsyncClusterConnection asyncClusterConnection; private UserProvider userProvider; - protected final ReplicationServerRpcServices rpcServices; + final ReplicationServerRpcServices rpcServices; + + // Total buffer size on this RegionServer for holding batched edits to be shipped. + private final long totalBufferLimit; + private AtomicLong totalBufferUsed = new AtomicLong(); + + private final MetricsReplicationGlobalSourceSource globalMetrics; + private final Map<String, MetricsSource> sourceMetrics = new HashMap<>(); + private final ConcurrentMap<String, ReplicationSourceInterface> sources = + new ConcurrentHashMap<>(); + + private final ReplicationQueueStorage queueStorage; + private final ReplicationPeers replicationPeers; // Stub to do region server status calls against the master. private volatile ReplicationServerStatusService.BlockingInterface rssStub; @@ -124,12 +166,9 @@ public class HReplicationServer extends Thread implements Server { // RPC client. Used to make the stub above that does region server status checking. private RpcClient rpcClient; - /** - * ChoreService used to schedule tasks that we want to run periodically - */ - private ChoreService choreService; + private ReplicationSinkService replicationSinkService; - public HReplicationServer(final Configuration conf) throws IOException { + public HReplicationServer(final Configuration conf) throws Exception { try { this.startCode = System.currentTimeMillis(); this.conf = conf; @@ -142,12 +181,29 @@ public class HReplicationServer extends Thread implements Server { serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startCode); this.userProvider = UserProvider.instantiate(conf); + // login the zookeeper client principal (if using security) + ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE, + HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName); + // login the server principal (if using secure Hadoop) + this.userProvider.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE, + SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, hostName); + // init superusers and add the server principal (if using security) + // or process owner as default super user. + Superusers.initialize(conf); this.msgInterval = conf.getInt("hbase.replicationserver.msginterval", 3 * 1000); this.sleeper = new Sleeper(this.msgInterval, this); this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT); + this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, + HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); + this.globalMetrics = + CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) + .getGlobalSource(); + + initializeFileSystem(); + this.choreService = new ChoreService(getName(), true); // Some unit tests don't need a cluster, so no zookeeper at all if (!conf.getBoolean("hbase.testing.nocluster", false)) { @@ -160,6 +216,12 @@ public class HReplicationServer extends Thread implements Server { zooKeeper = null; masterAddressTracker = null; } + + this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zooKeeper, conf); + this.replicationPeers = + ReplicationFactory.getReplicationPeers(zooKeeper, this.conf); + this.replicationPeers.init(); + this.clusterId = ZKClusterId.getUUIDForCluster(zooKeeper); this.rpcServices.start(zooKeeper); this.choreService = new ChoreService(getName(), true); } catch (Throwable t) { @@ -170,6 +232,15 @@ public class HReplicationServer extends Thread implements Server { } } + private void initializeFileSystem() throws IOException { + // Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase + // checksum verification enabled, then automatically switch off hdfs checksum verification. + boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true); + CommonFSUtils.setFsDefault(this.conf, CommonFSUtils.getWALRootDir(this.conf)); + this.walFs = new HFileSystem(this.conf, useHBaseChecksum); + this.walRootDir = CommonFSUtils.getWALRootDir(this.conf); + } + public String getProcessName() { return REPLICATION_SERVER; } @@ -289,6 +360,9 @@ public class HReplicationServer extends Thread implements Server { if (this.replicationSinkService != null) { this.replicationSinkService.stopReplicationService(); } + if (this.choreService != null) { + this.choreService.shutdown(); + } } @Override @@ -328,7 +402,7 @@ public class HReplicationServer extends Thread implements Server { @Override public ChoreService getChoreService() { - return this.choreService; + return choreService; } @Override @@ -592,4 +666,69 @@ public class HReplicationServer extends Thread implements Server { } return interrupted; } + + @Override + public long getTotalBufferLimit() { + return this.totalBufferLimit; + } + + @Override + public AtomicLong getTotalBufferUsed() { + return this.totalBufferUsed; + } + + @Override + public MetricsReplicationGlobalSourceSource getGlobalMetrics() { + return this.globalMetrics; + } + + @Override + public void finishRecoveredSource(RecoveredReplicationSource src) { + this.sources.remove(src.getQueueId()); + this.sourceMetrics.remove(src.getQueueId()); + deleteQueue(src.getQueueId()); + LOG.info("Finished recovering queue {} with the following stats: {}", src.getQueueId(), + src.getStats()); + } + + public void startReplicationSource(ServerName producer, String queueId) + throws IOException, ReplicationException { + ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId); + String peerId = replicationQueueInfo.getPeerId(); + this.replicationPeers.addPeer(peerId); + Path walDir = + new Path(walRootDir, AbstractFSWALProvider.getWALDirectoryName(producer.toString())); + MetricsSource metrics = new MetricsSource(queueId); + + ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId); + // init replication source + src.init(conf, walFs, walDir, this, queueStorage, replicationPeers.getPeer(peerId), this, + producer, queueId, clusterId, p -> OptionalLong.empty(), metrics); + queueStorage.getWALsInQueue(producer, queueId) + .forEach(walName -> src.enqueueLog(new Path(walDir, walName))); + src.startup(); + sources.put(queueId, src); + sourceMetrics.put(queueId, metrics); + } + + /** + * Delete a complete queue of wals associated with a replication source + * @param queueId the id of replication queue to delete + */ + private void deleteQueue(String queueId) { + abortWhenFail(() -> this.queueStorage.removeQueue(getServerName(), queueId)); + } + + @FunctionalInterface + private interface ReplicationQueueOperation { + void exec() throws ReplicationException; + } + + private void abortWhenFail(ReplicationQueueOperation op) { + try { + op.exec(); + } catch (ReplicationException e) { + abort("Failed to operate on replication queue", e); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java index 15d4f8c..b8c3884 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java @@ -56,11 +56,14 @@ import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.StartReplicationSourceRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.StartReplicationSourceResponse; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hbase.thirdparty.com.google.protobuf.Message; @@ -321,4 +324,16 @@ public class ReplicationServerRpcServices implements HBaseRPCErrorHandler, throw new ServiceException(ie); } } + + @Override + public StartReplicationSourceResponse startReplicationSource(RpcController controller, + StartReplicationSourceRequest request) throws ServiceException { + try { + replicationServer.startReplicationSource(ProtobufUtil.toServerName(request.getServerName()), + request.getQueueId()); + return StartReplicationSourceResponse.newBuilder().build(); + } catch (Exception e) { + throw new ServiceException(e); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index 7cb159e..147556f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -51,10 +51,11 @@ public class RecoveredReplicationSource extends ReplicationSource { @Override public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceController overallController, ReplicationQueueStorage queueStorage, - ReplicationPeer replicationPeer, Server server, String peerClusterZnode, UUID clusterId, - WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { - super.init(conf, fs, walDir, overallController, queueStorage, replicationPeer, server, - peerClusterZnode, clusterId, walFileLengthProvider, metrics); + ReplicationPeer replicationPeer, Server server, ServerName producer, String queueId, + UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) + throws IOException { + super.init(conf, fs, walDir, overallController, queueStorage, replicationPeer, server, producer, + queueId, clusterId, walFileLengthProvider, metrics); this.actualPeerId = this.replicationQueueInfo.getPeerId(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 27f2ce7..0d9ee4b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -62,10 +62,13 @@ import org.apache.hadoop.hbase.replication.ReplicationSourceController; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter; +import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorage; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.zookeeper.ZKListener; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; @@ -149,6 +152,7 @@ public class ReplicationSource implements ReplicationSourceInterface { private int waitOnEndpointSeconds = -1; private Thread initThread; + private Thread fetchWALsThread; /** * WALs to replicate. @@ -186,8 +190,9 @@ public class ReplicationSource implements ReplicationSourceInterface { @Override public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceController overallController, ReplicationQueueStorage queueStorage, - ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, - WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { + ReplicationPeer replicationPeer, Server server, ServerName producer, String queueId, + UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) + throws IOException { this.server = server; this.conf = HBaseConfiguration.create(conf); this.walDir = walDir; @@ -219,6 +224,19 @@ public class ReplicationSource implements ReplicationSourceInterface { this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort", true); + if (conf.getBoolean(HConstants.REPLICATION_OFFLOAD_ENABLE_KEY, + HConstants.REPLICATION_OFFLOAD_ENABLE_DEFAULT)) { + if (queueStorage instanceof ZKReplicationQueueStorage) { + ZKReplicationQueueStorage zkQueueStorage = (ZKReplicationQueueStorage) queueStorage; + zkQueueStorage.getZookeeper().registerListener( + new ReplicationQueueListener(this, zkQueueStorage, producer, queueId, walDir)); + LOG.info("Register a ZKListener to track the WALs from {}'s replication queue, queueId={}", + producer, queueId); + } else { + throw new UnsupportedOperationException( + "hbase.replication.offload.enabled=true only support ZKReplicationQueueStorage"); + } + } LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId, replicationPeer.getId(), this.currentBandwidth); } @@ -928,4 +946,36 @@ public class ReplicationSource implements ReplicationSourceInterface { server.abort("Failed to operate on replication queue", e); } } + + /** + * Tracks changes to the WALs in the replication queue. + */ + public static class ReplicationQueueListener extends ZKListener { + + private final ReplicationSource source; + private final String queueNode; + private final Path walDir; + + public ReplicationQueueListener(ReplicationSource source, + ZKReplicationQueueStorage zkQueueStorage, ServerName producer, String queueId, Path walDir) { + super(zkQueueStorage.getZookeeper()); + this.source = source; + this.queueNode = zkQueueStorage.getQueueNode(producer, queueId); + this.walDir = walDir; + } + + @Override + public synchronized void nodeChildrenChanged(String path) { + if (path.equals(queueNode)) { + LOG.info("Detected change to the WALs in the replication queue {}", queueNode); + try { + ZKUtil.listChildrenNoWatch(watcher, queueNode).forEach(walName -> { + source.enqueueLog(new Path(walDir, walName)); + }); + } catch (KeeperException e) { + LOG.warn("Failed to read WALs in the replication queue {}", queueNode, e); + } + } + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java index 8863f14..56c8ee4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java @@ -35,7 +35,7 @@ public final class ReplicationSourceFactory { private ReplicationSourceFactory() {} - static ReplicationSourceInterface create(Configuration conf, String queueId) { + public static ReplicationSourceInterface create(Configuration conf, String queueId) { ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId); boolean isQueueRecovered = replicationQueueInfo.isQueueRecovered(); ReplicationSourceInterface src; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 296bd27..461276e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -50,6 +50,7 @@ public interface ReplicationSourceInterface { * @param queueStorage the replication queue storage * @param replicationPeer the replication peer * @param server the server which start and run this replication source + * @param producer the name of region server which produce WAL to the replication queue * @param queueId the id of our replication queue * @param clusterId unique UUID for the cluster * @param walFileLengthProvider used to get the WAL length @@ -57,8 +58,9 @@ public interface ReplicationSourceInterface { */ void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceController overallController, ReplicationQueueStorage queueStorage, - ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, - WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException; + ReplicationPeer replicationPeer, Server server, ServerName producer, String queueId, + UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) + throws IOException; /** * Add a log to the list of logs to replicate diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index b6cb087..3dc2d12 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -304,8 +304,8 @@ public class ReplicationSourceManager implements ReplicationSourceController { WALFileLengthProvider walFileLengthProvider = this.walFactory.getWALProvider() != null? this.walFactory.getWALProvider().getWALFileLengthProvider() : p -> OptionalLong.empty(); - src.init(conf, fs, logDir, this, queueStorage, replicationPeer, server, queueId, clusterId, - walFileLengthProvider, new MetricsSource(queueId)); + src.init(conf, fs, logDir, this, queueStorage, replicationPeer, server, server.getServerName(), + queueId, clusterId, walFileLengthProvider, new MetricsSource(queueId)); return src; } @@ -925,8 +925,9 @@ public class ReplicationSourceManager implements ReplicationSourceController { CatalogReplicationSourcePeer peer = new CatalogReplicationSourcePeer(this.conf, this.clusterId.toString()); final ReplicationSourceInterface crs = new CatalogReplicationSource(); - crs.init(conf, fs, logDir, this, new NoopReplicationQueueStorage(), peer, server, peer.getId(), - clusterId, walProvider.getWALFileLengthProvider(), new MetricsSource(peer.getId())); + crs.init(conf, fs, logDir, this, new NoopReplicationQueueStorage(), peer, server, + server.getServerName(), peer.getId(), clusterId, walProvider.getWALFileLengthProvider(), + new MetricsSource(peer.getId())); // Add listener on the provider so we can pick up the WAL to replicate on roll. WALActionsListener listener = new WALActionsListener() { @Override public void postLogRoll(Path oldPath, Path newPath) throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index 8a32e94..8f28dee 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -48,8 +48,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { @Override public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceController overallController, ReplicationQueueStorage queueStorage, - ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, - WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { + ReplicationPeer replicationPeer, Server server, ServerName producer, String queueId, + UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) + throws IOException { this.queueId = queueId; this.metrics = metrics; this.walFileLengthProvider = walFileLengthProvider; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java index 9ceacee..db4152e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java @@ -18,10 +18,10 @@ package org.apache.hadoop.hbase.replication; import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; @@ -38,13 +38,14 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; - -import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersRequest; @@ -53,11 +54,14 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterServ @Category({ ReplicationTests.class, MediumTests.class }) public class TestReplicationFetchServers extends TestReplicationBase { + private static final Logger LOG = LoggerFactory.getLogger(TestReplicationFetchServers.class); @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestReplicationFetchServers.class); + private static HReplicationServer replicationServer; + private static AtomicBoolean fetchFlag = new AtomicBoolean(false); public static class MyObserver implements MasterCoprocessor, MasterObserver { @@ -77,6 +81,17 @@ public class TestReplicationFetchServers extends TestReplicationBase { public static void setUpBeforeClass() throws Exception { CONF2.set(MASTER_COPROCESSOR_CONF_KEY, MyObserver.class.getName()); TestReplicationBase.setUpBeforeClass(); + replicationServer = new HReplicationServer(CONF2); + replicationServer.start(); + UTIL2.waitFor(60000, () -> replicationServer.isOnline()); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TestReplicationBase.tearDownAfterClass(); + if (!replicationServer.isStopped()) { + replicationServer.stop("test"); + } } @Before @@ -85,15 +100,23 @@ public class TestReplicationFetchServers extends TestReplicationBase { } @Test - public void testMasterListReplicationPeerServers() throws IOException, ServiceException { + public void testMasterListReplicationPeerServers() throws IOException { AsyncClusterConnection conn = UTIL2.getAsyncConnection(); ServerName master = UTIL2.getAdmin().getMaster(); - MasterService.BlockingInterface masterStub = MasterService.newBlockingStub( - conn.getRpcClient().createBlockingRpcChannel(master, User.getCurrent(), 1000)); - ListReplicationSinkServersResponse resp = masterStub.listReplicationSinkServers( - null, ListReplicationSinkServersRequest.newBuilder().build()); - List<ServerName> servers = ProtobufUtil.toServerNameList(resp.getServerNameList()); - assertFalse(servers.isEmpty()); + // Wait for the replication server report to master + UTIL2.waitFor(60000, () -> { + List<ServerName> servers = new ArrayList<>(); + try { + MasterService.BlockingInterface masterStub = MasterService.newBlockingStub( + conn.getRpcClient().createBlockingRpcChannel(master, User.getCurrent(), 1000)); + ListReplicationSinkServersResponse resp = masterStub.listReplicationSinkServers( + null, ListReplicationSinkServersRequest.newBuilder().build()); + servers = ProtobufUtil.toServerNameList(resp.getServerNameList()); + } catch (Exception e) { + LOG.debug("Failed to list replication servers", e); + } + return servers.size() == 1; + }); assertTrue(fetchFlag.get()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSink.java similarity index 89% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSink.java index 30660c6..d97667b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSink.java @@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.ReplicationServerManager; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.ReplicationServerSinkPeer; -import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.SinkPeer; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; @@ -64,13 +63,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Category({ReplicationTests.class, MediumTests.class}) -public class TestReplicationServer { +public class TestReplicationServerSink { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestReplicationServer.class); + HBaseClassTestRule.forClass(TestReplicationServerSink.class); - private static final Logger LOG = LoggerFactory.getLogger(TestReplicationServer.class); + private static final Logger LOG = LoggerFactory.getLogger(TestReplicationServerSink.class); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -126,6 +125,7 @@ public class TestReplicationServer { if (!replicationServer.isStopped()) { replicationServer.stop("test"); } + TEST_UTIL.waitFor(10000, () -> !replicationServer.isAlive()); replicationServer = null; replicationServerName = null; } @@ -145,22 +145,7 @@ public class TestReplicationServer { replicateWALEntryAndVerify(sinkPeer); } - /** - * Requests region server using {@link AsyncReplicationServerAdmin} - */ - @Test - public void testReplicateWAL2() throws Exception { - AsyncClusterConnection conn = - TEST_UTIL.getHBaseCluster().getMaster().getAsyncClusterConnection(); - ServerName rs = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().get(0) - .getRegionServer().getServerName(); - AsyncReplicationServerAdmin replAdmin = conn.getReplicationServerAdmin(rs); - - ReplicationServerSinkPeer sinkPeer = new ReplicationServerSinkPeer(rs, replAdmin); - replicateWALEntryAndVerify(sinkPeer); - } - - private void replicateWALEntryAndVerify(SinkPeer sinkPeer) throws Exception { + private void replicateWALEntryAndVerify(ReplicationServerSinkPeer sinkPeer) throws Exception { Entry[] entries = new Entry[BATCH_SIZE]; for(int i = 0; i < BATCH_SIZE; i++) { entries[i] = generateEdit(i, TABLENAME, Bytes.toBytes(i)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSource.java new file mode 100644 index 0000000..843e5b1 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSource.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestReplicationServerSource extends TestReplicationBase { + + @ClassRule public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReplicationServerSource.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestReplicationServerSource.class); + + private static HReplicationServer replicationServer; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + UTIL1.getConfiguration().setBoolean(HConstants.REPLICATION_OFFLOAD_ENABLE_KEY, true); + TestReplicationBase.setUpBeforeClass(); + replicationServer = new HReplicationServer(UTIL1.getConfiguration()); + replicationServer.start(); + UTIL1.waitFor(60000, () -> replicationServer.isOnline()); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + replicationServer.stop("Tear down after test"); + TestReplicationBase.tearDownAfterClass(); + } + + @Test + public void test() throws Exception { + try { + // Only start one region server in source cluster + ServerName producer = UTIL1.getMiniHBaseCluster().getRegionServer(0).getServerName(); + replicationServer.startReplicationSource(producer, PEER_ID2); + } catch (Throwable e) { + LOG.info("Failed to start replicaiton source", e); + } + runSmallBatchTest(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 697a5ec..bd673bc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -139,7 +139,7 @@ public class TestReplicationSource { String queueId = "qid"; RegionServerServices rss = TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); - rs.init(conf, null, null, manager, null, mockPeer, rss, queueId, null, + rs.init(conf, null, null, manager, null, mockPeer, rss, rss.getServerName(), queueId, null, p -> OptionalLong.empty(), new MetricsSource(queueId)); try { rs.startup(); @@ -177,8 +177,8 @@ public class TestReplicationSource { String queueId = "qid"; RegionServerServices rss = TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); - rs.init(conf, null, null, manager, null, mockPeer, rss, queueId, - uuid, p -> OptionalLong.empty(), new MetricsSource(queueId)); + rs.init(conf, null, null, manager, null, mockPeer, rss, rss.getServerName(), queueId, uuid, + p -> OptionalLong.empty(), new MetricsSource(queueId)); try { rs.startup(); TEST_UTIL.waitFor(30000, () -> rs.getWalEntryFilter() != null); @@ -264,9 +264,9 @@ public class TestReplicationSource { Configuration testConf = HBaseConfiguration.create(); testConf.setInt("replication.source.maxretriesmultiplier", 1); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); - Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); - source.init(testConf, null, null, manager, null, mockPeer, null, "testPeer", - null, p -> OptionalLong.empty(), null); + Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); + source.init(testConf, null, null, manager, null, mockPeer, null, null, "testPeer", null, + p -> OptionalLong.empty(), null); ExecutorService executor = Executors.newSingleThreadExecutor(); Future<?> future = executor.submit( () -> source.terminate("testing source termination")); @@ -289,7 +289,7 @@ public class TestReplicationSource { ReplicationPeer mockPeer = mock(ReplicationPeer.class); Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); Configuration testConf = HBaseConfiguration.create(); - source.init(testConf, null, null, mockManager, null, mockPeer, null, + source.init(testConf, null, null, mockManager, null, mockPeer, null, null, "testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class)); ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null, conf, null, 0, null, source, null); @@ -315,7 +315,7 @@ public class TestReplicationSource { reader.addEntryToBatch(batch, mockEntry); reader.entryBatchQueue.put(batch); source.terminate("test"); - assertEquals(0, source.controller.getTotalBufferUsed().get()); + assertEquals(0, mockManager.getTotalBufferUsed().get()); } /** @@ -536,7 +536,7 @@ public class TestReplicationSource { String queueId = "qid"; RegionServerServices rss = TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); - rs.init(conf, null, null, manager, null, mockPeer, rss, queueId, null, + rs.init(conf, null, null, manager, null, mockPeer, rss, rss.getServerName(), queueId, null, p -> OptionalLong.empty(), new MetricsSource(queueId)); return rss; } @@ -655,7 +655,7 @@ public class TestReplicationSource { TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); ReplicationSource source = new ReplicationSource(); - source.init(conf, null, null, manager, null, mockPeer, rss, id, null, + source.init(conf, null, null, manager, null, mockPeer, rss, rss.getServerName(), id, null, p -> OptionalLong.empty(), metrics); final Path log1 = new Path(logDir, "log-walgroup-a.8"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 44914a5..e6b745e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -414,7 +414,8 @@ public abstract class TestReplicationSourceManager { assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group)); ReplicationSourceInterface source = new ReplicationSource(); source.init(conf, fs, null, manager, manager.getQueueStorage(), rp1.getPeer("1"), - manager.getServer(), id, null, p -> OptionalLong.empty(), null); + manager.getServer(), manager.getServer().getServerName(), id, null, p -> OptionalLong.empty(), + null); source.cleanOldWALs(file2, false); // log1 should be deleted assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group)); @@ -630,16 +631,16 @@ public abstract class TestReplicationSourceManager { ReplicationSourceInterface source = new ReplicationSource(); source.init(conf, fs, null, manager, manager.getQueueStorage(), - mockReplicationPeerForSyncReplication(peerId2), manager.getServer(), peerId2, null, - p -> OptionalLong.empty(), null); + mockReplicationPeerForSyncReplication(peerId2), manager.getServer(), + manager.getServer().getServerName(), peerId2, null, p -> OptionalLong.empty(), null); source.cleanOldWALs(walName, true); // still there if peer id does not match assertTrue(fs.exists(remoteWAL)); source = new ReplicationSource(); source.init(conf, fs, null, manager, manager.getQueueStorage(), - mockReplicationPeerForSyncReplication(slaveId), manager.getServer(), slaveId, null, - p -> OptionalLong.empty(), null); + mockReplicationPeerForSyncReplication(slaveId), manager.getServer(), + manager.getServer().getServerName(), slaveId, null, p -> OptionalLong.empty(), null); source.cleanOldWALs(walName, true); assertFalse(fs.exists(remoteWAL)); } finally { @@ -819,9 +820,10 @@ public abstract class TestReplicationSourceManager { @Override public void init(Configuration conf, FileSystem fs, Path walDir, - ReplicationSourceController overallController, ReplicationQueueStorage rq, ReplicationPeer rp, - Server server, String peerClusterId, UUID clusterId, - WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { + ReplicationSourceController overallController, ReplicationQueueStorage queueStorage, + ReplicationPeer replicationPeer, Server server, ServerName producer, String queueId, + UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) + throws IOException { throw new IOException("Failing deliberately"); } }