This is an automated email from the ASF dual-hosted git repository. sunxin pushed a commit to branch HBASE-24666 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 4718d24699e8129c8b1fa4d0acf236137248d492 Author: Guanghao Zhang <zg...@apache.org> AuthorDate: Sun Sep 20 09:02:53 2020 +0800 HBASE-24998 Introduce a ReplicationSourceOverallController interface and decouple ReplicationSourceManager and ReplicationSource (#2364) Signed-off-by: meiyi <myime...@gmail.com> --- .../java/org/apache/hadoop/hbase/HConstants.java | 2 + .../hadoop/hbase/regionserver/RSRpcServices.java | 4 +- .../replication/ReplicationSourceController.java | 32 +++++++++----- .../regionserver/RecoveredReplicationSource.java | 18 ++++---- .../regionserver/ReplicationSource.java | 35 ++++++--------- .../regionserver/ReplicationSourceInterface.java | 25 +++++++---- .../regionserver/ReplicationSourceManager.java | 51 +++++++++++++--------- .../regionserver/ReplicationSourceShipper.java | 4 +- .../regionserver/ReplicationSourceWALReader.java | 13 +++--- .../hbase/replication/ReplicationSourceDummy.java | 21 +++++---- .../regionserver/TestBasicWALEntryStream.java | 15 ++++--- .../regionserver/TestReplicationSource.java | 2 +- .../regionserver/TestReplicationSourceManager.java | 3 +- 13 files changed, 125 insertions(+), 100 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 10a38f6..6cde48d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -994,6 +994,8 @@ public final class HConstants { /* * cluster replication constants. */ + public static final String REPLICATION_OFFLOAD_ENABLE_KEY = "hbase.replication.offload.enabled"; + public static final boolean REPLICATION_OFFLOAD_ENABLE_DEFAULT = false; public static final String REPLICATION_SOURCE_SERVICE_CLASSNAME = "hbase.replication.source.service"; public static final String REPLICATION_SERVICE_CLASSNAME_DEFAULT = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index c1f447c..72fea23 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -258,6 +258,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuo import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService; import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; @@ -271,7 +273,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe @SuppressWarnings("deprecation") public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction, - ConfigurationObserver { + ConfigurationObserver, ReplicationServerService.BlockingInterface { private static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class); /** RPC scheduler to use for the region server. */ diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceController.java similarity index 50% rename from hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceController.java index 5c21e1e..5bb9dd6 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceController.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,21 +17,32 @@ */ package org.apache.hadoop.hbase.replication; -import org.apache.hadoop.hbase.ServerName; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource; +import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource; import org.apache.yetus.audience.InterfaceAudience; /** - * The replication listener interface can be implemented if a class needs to subscribe to events - * generated by the ReplicationTracker. These events include things like addition/deletion of peer - * clusters or failure of a local region server. To receive events, the class also needs to register - * itself with a Replication Tracker. + * Used to control all replication sources inside one RegionServer or ReplicationServer. + * Used by {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSource} or + * {@link RecoveredReplicationSource}. */ @InterfaceAudience.Private -public interface ReplicationListener { +public interface ReplicationSourceController { + + /** + * Returns the maximum size in bytes of edits held in memory which are pending replication + * across all sources inside this RegionServer or ReplicationServer. + */ + long getTotalBufferLimit(); + + AtomicLong getTotalBufferUsed(); + + MetricsReplicationGlobalSourceSource getGlobalMetrics(); /** - * A region server has been removed from the local cluster - * @param regionServer the removed region server + * Call this when the recovered replication source replicated all WALs. */ - public void regionServerRemoved(ServerName regionServer); + void finishRecoveredSource(RecoveredReplicationSource src); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index abbc046..7cb159e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationSourceController; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; @@ -45,18 +46,15 @@ public class RecoveredReplicationSource extends ReplicationSource { private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class); - private Path walDir; - private String actualPeerId; @Override - public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager, - ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, - String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider, - MetricsSource metrics) throws IOException { - super.init(conf, fs, walDir, manager, queueStorage, replicationPeer, server, peerClusterZnode, - clusterId, walFileLengthProvider, metrics); - this.walDir = walDir; + public void init(Configuration conf, FileSystem fs, Path walDir, + ReplicationSourceController overallController, ReplicationQueueStorage queueStorage, + ReplicationPeer replicationPeer, Server server, String peerClusterZnode, UUID clusterId, + WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { + super.init(conf, fs, walDir, overallController, queueStorage, replicationPeer, server, + peerClusterZnode, clusterId, walFileLengthProvider, metrics); this.actualPeerId = this.replicationQueueInfo.getPeerId(); } @@ -149,7 +147,7 @@ public class RecoveredReplicationSource extends ReplicationSource { void tryFinish() { if (workerThreads.isEmpty()) { this.getSourceMetrics().clear(); - manager.finishRecoveredSource(this); + controller.finishRecoveredSource(this); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index cfcc837..27f2ce7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationSourceController; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter; @@ -96,8 +97,9 @@ public class ReplicationSource implements ReplicationSourceInterface { protected Configuration conf; protected ReplicationQueueInfo replicationQueueInfo; - // The manager of all sources to which we ping back our progress - ReplicationSourceManager manager; + protected Path walDir; + + protected ReplicationSourceController controller; // Should we stop everything? protected Server server; // How long should we sleep for each retry @@ -181,23 +183,14 @@ public class ReplicationSource implements ReplicationSourceInterface { this.baseFilterOutWALEntries = Collections.unmodifiableList(baseFilterOutWALEntries); } - /** - * Instantiation method used by region servers - * @param conf configuration to use - * @param fs file system to use - * @param manager replication manager to ping to - * @param server the server for this region server - * @param queueId the id of our replication queue - * @param clusterId unique UUID for the cluster - * @param metrics metrics for replication source - */ @Override - public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager, - ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, - String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, - MetricsSource metrics) throws IOException { + public void init(Configuration conf, FileSystem fs, Path walDir, + ReplicationSourceController overallController, ReplicationQueueStorage queueStorage, + ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, + WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { this.server = server; this.conf = HBaseConfiguration.create(conf); + this.walDir = walDir; this.waitOnEndpointSeconds = this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS); decorateConf(); @@ -209,7 +202,7 @@ public class ReplicationSource implements ReplicationSourceInterface { this.logQueue = new ReplicationSourceLogQueue(conf, metrics, this); this.queueStorage = queueStorage; this.replicationPeer = replicationPeer; - this.manager = manager; + this.controller = overallController; this.fs = fs; this.metrics = metrics; this.clusterId = clusterId; @@ -336,9 +329,9 @@ public class ReplicationSource implements ReplicationSourceInterface { Threads.setDaemonThreadRunning( walReader, Thread.currentThread().getName() + ".replicationSource.wal-reader." + walGroupId + "," + queueId, - (t,e) -> this.uncaughtException(t, e, this.manager, this.getPeerId())); + (t,e) -> this.uncaughtException(t, e, null, this.getPeerId())); worker.setWALReader(walReader); - worker.startup((t,e) -> this.uncaughtException(t, e, this.manager, this.getPeerId())); + worker.startup((t,e) -> this.uncaughtException(t, e, null, this.getPeerId())); return worker; } }); @@ -766,9 +759,9 @@ public class ReplicationSource implements ReplicationSourceInterface { throttler.addPushSize(batchSize); } totalReplicatedEdits.addAndGet(entries.size()); - long newBufferUsed = manager.getTotalBufferUsed().addAndGet(-batchSize); + long newBufferUsed = controller.getTotalBufferUsed().addAndGet(-batchSize); // Record the new buffer usage - this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); + controller.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 77bba90..296bd27 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationSourceController; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.yetus.audience.InterfaceAudience; @@ -42,14 +43,22 @@ public interface ReplicationSourceInterface { /** * Initializer for the source * - * @param conf the configuration to use - * @param fs the file system to use - * @param server the server for this region server - */ - void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager, - ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, - String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, - MetricsSource metrics) throws IOException; + * @param conf configuration to use + * @param fs file system to use + * @param walDir the directory where the WAL is located + * @param overallController the overall controller of all replication sources + * @param queueStorage the replication queue storage + * @param replicationPeer the replication peer + * @param server the server which start and run this replication source + * @param queueId the id of our replication queue + * @param clusterId unique UUID for the cluster + * @param walFileLengthProvider used to get the WAL length + * @param metrics metrics for this replication source + */ + void init(Configuration conf, FileSystem fs, Path walDir, + ReplicationSourceController overallController, ReplicationQueueStorage queueStorage, + ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, + WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException; /** * Add a log to the list of logs to replicate diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index f502a65..b6cb087 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationSourceController; import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -93,7 +94,7 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto * </ul> */ @InterfaceAudience.Private -public class ReplicationSourceManager { +public class ReplicationSourceManager implements ReplicationSourceController { private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceManager.class); // all the sources that read this RS's logs and every peer only has one replication source private final ConcurrentMap<String, ReplicationSourceInterface> sources; @@ -134,12 +135,6 @@ public class ReplicationSourceManager { private AtomicLong totalBufferUsed = new AtomicLong(); - // How long should we sleep for each retry when deleting remote wal files for sync replication - // peer. - private final long sleepForRetries; - // Maximum number of retries before taking bold actions when deleting remote wal files for sync - // replication peer. - private final int maxRetriesMultiplier; // Total buffer size on this RegionServer for holding batched edits to be shipped. private final long totalBufferLimit; private final MetricsReplicationGlobalSourceSource globalMetrics; @@ -155,6 +150,12 @@ public class ReplicationSourceManager { AtomicReference<ReplicationSourceInterface> catalogReplicationSource = new AtomicReference<>(); /** + * When enable replication offload, will not create replication source and only write WAL to + * replication queue storage. The replication source will be started by ReplicationServer. + */ + private final boolean replicationOffload; + + /** * Creates a replication manager and sets the watch on all the other registered region servers * @param queueStorage the interface for manipulating replication queues * @param conf the configuration to use @@ -197,12 +198,11 @@ public class ReplicationSourceManager { this.latestPaths = new HashMap<>(); this.replicationForBulkLoadDataEnabled = conf.getBoolean( HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); - this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000); - this.maxRetriesMultiplier = - this.conf.getInt("replication.source.sync.maxretriesmultiplier", 60); this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); this.globalMetrics = globalMetrics; + this.replicationOffload = conf.getBoolean(HConstants.REPLICATION_OFFLOAD_ENABLE_KEY, + HConstants.REPLICATION_OFFLOAD_ENABLE_DEFAULT); } /** @@ -338,7 +338,9 @@ public class ReplicationSourceManager { if (peerConfig.isSyncReplication()) { syncReplicationPeerMappingManager.add(peer.getId(), peerConfig); } - src.startup(); + if (!replicationOffload) { + src.startup(); + } return src; } @@ -431,7 +433,9 @@ public class ReplicationSourceManager { .forEach(wal -> src.enqueueLog(new Path(this.logDir, wal))); } LOG.info("Startup replication source for " + src.getPeerId()); - src.startup(); + if (!replicationOffload) { + src.startup(); + } List<ReplicationSourceInterface> toStartup = new ArrayList<>(); // synchronized on oldsources to avoid race with NodeFailoverWorker @@ -454,8 +458,10 @@ public class ReplicationSourceManager { toStartup.add(recoveredReplicationSource); } } - for (ReplicationSourceInterface replicationSource : toStartup) { - replicationSource.startup(); + if (!replicationOffload) { + for (ReplicationSourceInterface replicationSource : toStartup) { + replicationSource.startup(); + } } } @@ -473,7 +479,8 @@ public class ReplicationSourceManager { return true; } - void finishRecoveredSource(ReplicationSourceInterface src) { + @Override + public void finishRecoveredSource(RecoveredReplicationSource src) { synchronized (oldsources) { if (!removeRecoveredSource(src)) { return; @@ -487,8 +494,7 @@ public class ReplicationSourceManager { * Clear the metrics and related replication queue of the specified old source * @param src source to clear */ - void removeSource(ReplicationSourceInterface src) { - LOG.info("Done with the queue " + src.getQueueId()); + private void removeSource(ReplicationSourceInterface src) { this.sources.remove(src.getPeerId()); // Delete queue from storage and memory deleteQueue(src.getQueueId()); @@ -532,7 +538,7 @@ public class ReplicationSourceManager { } } - // public because of we call it in TestReplicationEmptyWALRecovery + @InterfaceAudience.Private public void preLogRoll(Path newLog) throws IOException { String logName = newLog.getName(); String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName); @@ -550,8 +556,8 @@ public class ReplicationSourceManager { } } - // public because of we call it in TestReplicationEmptyWALRecovery - public void postLogRoll(Path newLog) throws IOException { + @InterfaceAudience.Private + public void postLogRoll(Path newLog) { // This only updates the sources we own, not the recovered ones for (ReplicationSourceInterface source : this.sources.values()) { source.enqueueLog(newLog); @@ -758,6 +764,7 @@ public class ReplicationSourceManager { } } + @Override public AtomicLong getTotalBufferUsed() { return totalBufferUsed; } @@ -766,6 +773,7 @@ public class ReplicationSourceManager { * Returns the maximum size in bytes of edits held in memory which are pending replication * across all sources inside this RegionServer. */ + @Override public long getTotalBufferLimit() { return totalBufferLimit; } @@ -856,7 +864,8 @@ public class ReplicationSourceManager { return executor.getActiveCount(); } - MetricsReplicationGlobalSourceSource getGlobalMetrics() { + @Override + public MetricsReplicationGlobalSourceSource getGlobalMetrics() { return this.globalMetrics; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 35c4e54..b904af8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -368,8 +368,8 @@ public class ReplicationSourceShipper extends Thread { LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.", totalToDecrement.longValue()); } - long newBufferUsed = source.manager.getTotalBufferUsed() + long newBufferUsed = source.controller.getTotalBufferUsed() .addAndGet(-totalToDecrement.longValue()); - source.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); + source.controller.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index d148162..698fd1e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -318,10 +318,11 @@ class ReplicationSourceWALReader extends Thread { //returns false if we've already exceeded the global quota private boolean checkQuota() { // try not to go over total quota - if (source.manager.getTotalBufferUsed().get() > source.manager.getTotalBufferLimit()) { + if (source.controller.getTotalBufferUsed().get() > source.controller + .getTotalBufferLimit()) { LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B", - this.source.getPeerId(), source.manager.getTotalBufferUsed().get(), - source.manager.getTotalBufferLimit()); + this.source.getPeerId(), source.controller.getTotalBufferUsed().get(), + source.controller.getTotalBufferLimit()); Threads.sleep(sleepForRetries); return false; } @@ -449,10 +450,10 @@ class ReplicationSourceWALReader extends Thread { * @return true if we should clear buffer and push all */ private boolean acquireBufferQuota(long size) { - long newBufferUsed = source.manager.getTotalBufferUsed().addAndGet(size); + long newBufferUsed = source.controller.getTotalBufferUsed().addAndGet(size); // Record the new buffer usage - source.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); - return newBufferUsed >= source.manager.getTotalBufferLimit(); + source.controller.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); + return newBufferUsed >= source.controller.getTotalBufferLimit(); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index 42445a6..8a32e94 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; import org.apache.hadoop.hbase.replication.regionserver.WALEntryBatch; import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -40,21 +39,21 @@ import org.apache.hadoop.hbase.wal.WAL.Entry; public class ReplicationSourceDummy implements ReplicationSourceInterface { private ReplicationPeer replicationPeer; - private String peerClusterId; + private String queueId; private Path currentPath; private MetricsSource metrics; private WALFileLengthProvider walFileLengthProvider; private AtomicBoolean startup = new AtomicBoolean(false); @Override - public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager, - ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId, - UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) - throws IOException { - this.peerClusterId = peerClusterId; + public void init(Configuration conf, FileSystem fs, Path walDir, + ReplicationSourceController overallController, ReplicationQueueStorage queueStorage, + ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, + WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { + this.queueId = queueId; this.metrics = metrics; this.walFileLengthProvider = walFileLengthProvider; - this.replicationPeer = rp; + this.replicationPeer = replicationPeer; } @Override @@ -97,14 +96,14 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { @Override public String getQueueId() { - return peerClusterId; + return queueId; } @Override public String getPeerId() { - String[] parts = peerClusterId.split("-", 2); + String[] parts = queueId.split("-", 2); return parts.length != 1 ? - parts[0] : peerClusterId; + parts[0] : queueId; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java index 7402d82..616defa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; +import org.apache.hadoop.hbase.replication.ReplicationSourceController; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -271,19 +272,19 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase { when(source.getWALFileLengthProvider()).thenReturn(log); when(source.getServer()).thenReturn(mockServer); when(source.isRecovered()).thenReturn(recovered); - source.manager = mockReplicationSourceManager(); + source.controller = mockReplicationSourceController(); return source; } - private ReplicationSourceManager mockReplicationSourceManager() { - ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); + private ReplicationSourceController mockReplicationSourceController() { + ReplicationSourceController controller = Mockito.mock(ReplicationSourceController.class); MetricsReplicationGlobalSourceSource globalMetrics = Mockito.mock(MetricsReplicationGlobalSourceSource.class); - when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics); - when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); - when(mockSourceManager.getTotalBufferLimit()) + when(controller.getGlobalMetrics()).thenReturn(globalMetrics); + when(controller.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); + when(controller.getTotalBufferLimit()) .thenReturn((long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); - return mockSourceManager; + return controller; } private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 0309731..697a5ec 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -315,7 +315,7 @@ public class TestReplicationSource { reader.addEntryToBatch(batch, mockEntry); reader.entryBatchQueue.put(batch); source.terminate("test"); - assertEquals(0, source.manager.getTotalBufferUsed().get()); + assertEquals(0, source.controller.getTotalBufferUsed().get()); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index b74b76e..44914a5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationSourceController; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationUtils; @@ -818,7 +819,7 @@ public abstract class TestReplicationSourceManager { @Override public void init(Configuration conf, FileSystem fs, Path walDir, - ReplicationSourceManager manager, ReplicationQueueStorage rq, ReplicationPeer rp, + ReplicationSourceController overallController, ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { throw new IOException("Failing deliberately");