This is an automated email from the ASF dual-hosted git repository. zghao pushed a commit to branch HBASE-24666 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 0dc4d5db321cb5164e7c87511db2ca1b995509dc Author: Guanghao Zhang <zg...@apache.org> AuthorDate: Sun Sep 20 09:02:53 2020 +0800 HBASE-24998 Introduce a ReplicationSourceOverallController interface and decouple ReplicationSourceManager and ReplicationSource (#2364) Signed-off-by: meiyi <myime...@gmail.com> --- .../java/org/apache/hadoop/hbase/HConstants.java | 2 + .../hbase/replication/ReplicationListener.java | 2 +- .../replication/ReplicationSourceController.java | 31 +++-- .../regionserver/RecoveredReplicationSource.java | 18 ++- .../regionserver/ReplicationSource.java | 31 ++--- .../regionserver/ReplicationSourceInterface.java | 25 ++-- .../regionserver/ReplicationSourceManager.java | 141 +++++++++++---------- .../regionserver/ReplicationSourceWALReader.java | 13 +- .../hbase/replication/ReplicationSourceDummy.java | 21 ++- .../regionserver/TestReplicationSourceManager.java | 11 +- .../regionserver/TestWALEntryStream.java | 15 ++- 11 files changed, 167 insertions(+), 143 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 5b4b6fb..f61558d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -979,6 +979,8 @@ public final class HConstants { /* * cluster replication constants. */ + public static final String REPLICATION_OFFLOAD_ENABLE_KEY = "hbase.replication.offload.enabled"; + public static final boolean REPLICATION_OFFLOAD_ENABLE_DEFAULT = false; public static final String REPLICATION_SOURCE_SERVICE_CLASSNAME = "hbase.replication.source.service"; public static final String REPLICATION_SERVICE_CLASSNAME_DEFAULT = diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java index f040bf9..6ecbb46 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java @@ -33,5 +33,5 @@ public interface ReplicationListener { * A region server has been removed from the local cluster * @param regionServer the removed region server */ - public void regionServerRemoved(String regionServer); + void regionServerRemoved(String regionServer); } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceController.java similarity index 50% copy from hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java copy to hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceController.java index f040bf9..5bb9dd6 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceController.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,20 +17,32 @@ */ package org.apache.hadoop.hbase.replication; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource; +import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource; import org.apache.yetus.audience.InterfaceAudience; /** - * The replication listener interface can be implemented if a class needs to subscribe to events - * generated by the ReplicationTracker. These events include things like addition/deletion of peer - * clusters or failure of a local region server. To receive events, the class also needs to register - * itself with a Replication Tracker. + * Used to control all replication sources inside one RegionServer or ReplicationServer. + * Used by {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSource} or + * {@link RecoveredReplicationSource}. */ @InterfaceAudience.Private -public interface ReplicationListener { +public interface ReplicationSourceController { + + /** + * Returns the maximum size in bytes of edits held in memory which are pending replication + * across all sources inside this RegionServer or ReplicationServer. + */ + long getTotalBufferLimit(); + + AtomicLong getTotalBufferUsed(); + + MetricsReplicationGlobalSourceSource getGlobalMetrics(); /** - * A region server has been removed from the local cluster - * @param regionServer the removed region server + * Call this when the recovered replication source replicated all WALs. */ - public void regionServerRemoved(String regionServer); + void finishRecoveredSource(RecoveredReplicationSource src); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index 62685ee..e0b626c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationSourceController; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; @@ -45,18 +46,15 @@ public class RecoveredReplicationSource extends ReplicationSource { private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class); - private Path walDir; - private String actualPeerId; @Override - public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager, - ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, - String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider, - MetricsSource metrics) throws IOException { - super.init(conf, fs, walDir, manager, queueStorage, replicationPeer, server, peerClusterZnode, - clusterId, walFileLengthProvider, metrics); - this.walDir = walDir; + public void init(Configuration conf, FileSystem fs, Path walDir, + ReplicationSourceController overallController, ReplicationQueueStorage queueStorage, + ReplicationPeer replicationPeer, Server server, String peerClusterZnode, UUID clusterId, + WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { + super.init(conf, fs, walDir, overallController, queueStorage, replicationPeer, server, + peerClusterZnode, clusterId, walFileLengthProvider, metrics); this.actualPeerId = this.replicationQueueInfo.getPeerId(); } @@ -149,7 +147,7 @@ public class RecoveredReplicationSource extends ReplicationSource { void tryFinish() { if (workerThreads.isEmpty()) { this.getSourceMetrics().clear(); - manager.finishRecoveredSource(this); + controller.finishRecoveredSource(this); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index dc7f1a5..879c604 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationSourceController; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter; @@ -101,8 +102,9 @@ public class ReplicationSource implements ReplicationSourceInterface { protected Configuration conf; protected ReplicationQueueInfo replicationQueueInfo; - // The manager of all sources to which we ping back our progress - ReplicationSourceManager manager; + protected Path walDir; + + protected ReplicationSourceController controller; // Should we stop everything? protected Server server; // How long should we sleep for each retry @@ -187,23 +189,14 @@ public class ReplicationSource implements ReplicationSourceInterface { this.baseFilterOutWALEntries = Collections.unmodifiableList(baseFilterOutWALEntries); } - /** - * Instantiation method used by region servers - * @param conf configuration to use - * @param fs file system to use - * @param manager replication manager to ping to - * @param server the server for this region server - * @param queueId the id of our replication queue - * @param clusterId unique UUID for the cluster - * @param metrics metrics for replication source - */ @Override - public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager, - ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, - String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, - MetricsSource metrics) throws IOException { + public void init(Configuration conf, FileSystem fs, Path walDir, + ReplicationSourceController overallController, ReplicationQueueStorage queueStorage, + ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, + WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { this.server = server; this.conf = HBaseConfiguration.create(conf); + this.walDir = walDir; this.waitOnEndpointSeconds = this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS); decorateConf(); @@ -214,7 +207,7 @@ public class ReplicationSource implements ReplicationSourceInterface { this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32); this.queueStorage = queueStorage; this.replicationPeer = replicationPeer; - this.manager = manager; + this.controller = overallController; this.fs = fs; this.metrics = metrics; this.clusterId = clusterId; @@ -783,9 +776,9 @@ public class ReplicationSource implements ReplicationSourceInterface { throttler.addPushSize(batchSize); } totalReplicatedEdits.addAndGet(entries.size()); - long newBufferUsed = manager.getTotalBufferUsed().addAndGet(-batchSize); + long newBufferUsed = controller.getTotalBufferUsed().addAndGet(-batchSize); // Record the new buffer usage - this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); + controller.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 321edc2..f3bf8a4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationSourceController; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.yetus.audience.InterfaceAudience; @@ -44,14 +45,22 @@ public interface ReplicationSourceInterface { /** * Initializer for the source * - * @param conf the configuration to use - * @param fs the file system to use - * @param server the server for this region server - */ - void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager, - ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, - String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, - MetricsSource metrics) throws IOException; + * @param conf configuration to use + * @param fs file system to use + * @param walDir the directory where the WAL is located + * @param overallController the overall controller of all replication sources + * @param queueStorage the replication queue storage + * @param replicationPeer the replication peer + * @param server the server which start and run this replication source + * @param queueId the id of our replication queue + * @param clusterId unique UUID for the cluster + * @param walFileLengthProvider used to get the WAL length + * @param metrics metrics for this replication source + */ + void init(Configuration conf, FileSystem fs, Path walDir, + ReplicationSourceController overallController, ReplicationQueueStorage queueStorage, + ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, + WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException; /** * Add a log to the list of logs to replicate diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 3212697..de9e21f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationSourceController; import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.util.Bytes; @@ -92,7 +93,7 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto * </ul> */ @InterfaceAudience.Private -public class ReplicationSourceManager implements ReplicationListener { +public class ReplicationSourceManager implements ReplicationListener, ReplicationSourceController { private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceManager.class); // all the sources that read this RS's logs and every peer only has one replication source private final ConcurrentMap<String, ReplicationSourceInterface> sources; @@ -126,12 +127,6 @@ public class ReplicationSourceManager implements ReplicationListener { private AtomicLong totalBufferUsed = new AtomicLong(); - // How long should we sleep for each retry when deleting remote wal files for sync replication - // peer. - private final long sleepForRetries; - // Maximum number of retries before taking bold actions when deleting remote wal files for sync - // replication peer. - private final int maxRetriesMultiplier; // Total buffer size on this RegionServer for holding batched edits to be shipped. private final long totalBufferLimit; private final MetricsReplicationGlobalSourceSource globalMetrics; @@ -139,6 +134,12 @@ public class ReplicationSourceManager implements ReplicationListener { private final Map<String, MetricsSource> sourceMetrics = new HashMap<>(); /** + * When enable replication offload, will not create replication source and only write WAL to + * replication queue storage. The replication source will be started by ReplicationServer. + */ + private final boolean replicationOffload; + + /** * Creates a replication manager and sets the watch on all the other registered region servers * @param queueStorage the interface for manipulating replication queues * @param replicationPeers @@ -186,12 +187,11 @@ public class ReplicationSourceManager implements ReplicationListener { this.latestPaths = new HashMap<>(); this.replicationForBulkLoadDataEnabled = conf.getBoolean( HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); - this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000); - this.maxRetriesMultiplier = - this.conf.getInt("replication.source.sync.maxretriesmultiplier", 60); this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); this.globalMetrics = globalMetrics; + this.replicationOffload = conf.getBoolean(HConstants.REPLICATION_OFFLOAD_ENABLE_KEY, + HConstants.REPLICATION_OFFLOAD_ENABLE_DEFAULT); } /** @@ -212,6 +212,47 @@ public class ReplicationSourceManager implements ReplicationListener { return this.executor.submit(this::adoptAbandonedQueues); } + @VisibleForTesting + @Override + public AtomicLong getTotalBufferUsed() { + return totalBufferUsed; + } + + @Override + public long getTotalBufferLimit() { + return totalBufferLimit; + } + + @Override + public void finishRecoveredSource(RecoveredReplicationSource src) { + synchronized (oldsources) { + if (!removeRecoveredSource(src)) { + return; + } + } + LOG.info("Finished recovering queue {} with the following stats: {}", src.getQueueId(), + src.getStats()); + } + + @Override + public MetricsReplicationGlobalSourceSource getGlobalMetrics() { + return this.globalMetrics; + } + + /** + * Clear the metrics and related replication queue of the specified old source + * @param src source to clear + */ + private boolean removeRecoveredSource(ReplicationSourceInterface src) { + if (!this.oldsources.remove(src)) { + return false; + } + LOG.info("Done with the recovered queue {}", src.getQueueId()); + // Delete queue from storage and memory + deleteQueue(src.getQueueId()); + return true; + } + private void adoptAbandonedQueues() { List<ServerName> currentReplicators = null; try { @@ -331,8 +372,7 @@ public class ReplicationSourceManager implements ReplicationListener { * @param peerId the id of the replication peer * @return the source that was created */ - @VisibleForTesting - ReplicationSourceInterface addSource(String peerId) throws IOException { + void addSource(String peerId) throws IOException { ReplicationPeer peer = replicationPeers.getPeer(peerId); ReplicationSourceInterface src = createSource(peerId, peer); // synchronized on latestPaths to avoid missing the new log @@ -354,8 +394,9 @@ public class ReplicationSourceManager implements ReplicationListener { if (peerConfig.isSyncReplication()) { syncReplicationPeerMappingManager.add(peer.getId(), peerConfig); } - src.startup(); - return src; + if (!replicationOffload) { + src.startup(); + } } /** @@ -373,7 +414,11 @@ public class ReplicationSourceManager implements ReplicationListener { * </p> * @param peerId the id of the sync replication peer */ - public void drainSources(String peerId) throws IOException, ReplicationException { + void drainSources(String peerId) throws IOException, ReplicationException { + if (replicationOffload) { + throw new ReplicationException( + "Should not add use sync replication when replication offload enabled"); + } String terminateMessage = "Sync replication peer " + peerId + " is transiting to STANDBY. Will close the previous replication source and open a new one"; ReplicationPeer peer = replicationPeers.getPeer(peerId); @@ -430,7 +475,7 @@ public class ReplicationSourceManager implements ReplicationListener { * replication queue storage and only to enqueue all logs to the new replication source * @param peerId the id of the replication peer */ - public void refreshSources(String peerId) throws ReplicationException, IOException { + void refreshSources(String peerId) throws ReplicationException, IOException { String terminateMessage = "Peer " + peerId + " state or config changed. Will close the previous replication source and open a new one"; ReplicationPeer peer = replicationPeers.getPeer(peerId); @@ -447,7 +492,9 @@ public class ReplicationSourceManager implements ReplicationListener { .forEach(wal -> src.enqueueLog(new Path(this.logDir, wal))); } LOG.info("Startup replication source for " + src.getPeerId()); - src.startup(); + if (!replicationOffload) { + src.startup(); + } List<ReplicationSourceInterface> toStartup = new ArrayList<>(); // synchronized on oldsources to avoid race with NodeFailoverWorker @@ -470,41 +517,18 @@ public class ReplicationSourceManager implements ReplicationListener { toStartup.add(recoveredReplicationSource); } } - for (ReplicationSourceInterface replicationSource : toStartup) { - replicationSource.startup(); - } - } - - /** - * Clear the metrics and related replication queue of the specified old source - * @param src source to clear - */ - private boolean removeRecoveredSource(ReplicationSourceInterface src) { - if (!this.oldsources.remove(src)) { - return false; - } - LOG.info("Done with the recovered queue {}", src.getQueueId()); - // Delete queue from storage and memory - deleteQueue(src.getQueueId()); - return true; - } - - void finishRecoveredSource(ReplicationSourceInterface src) { - synchronized (oldsources) { - if (!removeRecoveredSource(src)) { - return; + if (!replicationOffload) { + for (ReplicationSourceInterface replicationSource : toStartup) { + replicationSource.startup(); } } - LOG.info("Finished recovering queue {} with the following stats: {}", src.getQueueId(), - src.getStats()); } /** * Clear the metrics and related replication queue of the specified old source * @param src source to clear */ - void removeSource(ReplicationSourceInterface src) { - LOG.info("Done with the queue " + src.getQueueId()); + private void removeSource(ReplicationSourceInterface src) { this.sources.remove(src.getPeerId()); // Delete queue from storage and memory deleteQueue(src.getQueueId()); @@ -548,8 +572,7 @@ public class ReplicationSourceManager implements ReplicationListener { } } - // public because of we call it in TestReplicationEmptyWALRecovery - @VisibleForTesting + @InterfaceAudience.Private public void preLogRoll(Path newLog) throws IOException { String logName = newLog.getName(); String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName); @@ -567,9 +590,8 @@ public class ReplicationSourceManager implements ReplicationListener { } } - // public because of we call it in TestReplicationEmptyWALRecovery - @VisibleForTesting - public void postLogRoll(Path newLog) throws IOException { + @InterfaceAudience.Private + public void postLogRoll(Path newLog) { // This only updates the sources we own, not the recovered ones for (ReplicationSourceInterface source : this.sources.values()) { source.enqueueLog(newLog); @@ -739,7 +761,9 @@ public class ReplicationSourceManager implements ReplicationListener { LOG.trace("Enqueueing log from recovered queue for source: " + src.getQueueId()); src.enqueueLog(new Path(oldLogDir, wal)); } - src.startup(); + if (!replicationOffload) { + src.startup(); + } } } catch (IOException e) { // TODO manage it @@ -849,19 +873,6 @@ public class ReplicationSourceManager implements ReplicationListener { } } - @VisibleForTesting - public AtomicLong getTotalBufferUsed() { - return totalBufferUsed; - } - - /** - * Returns the maximum size in bytes of edits held in memory which are pending replication - * across all sources inside this RegionServer. - */ - public long getTotalBufferLimit() { - return totalBufferLimit; - } - /** * Get the directory where wals are archived * @return the directory where wals are archived @@ -967,10 +978,6 @@ public class ReplicationSourceManager implements ReplicationListener { return executor.getActiveCount(); } - MetricsReplicationGlobalSourceSource getGlobalMetrics() { - return this.globalMetrics; - } - @InterfaceAudience.Private Server getServer() { return this.server; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 65afe48..439915e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -268,10 +268,11 @@ class ReplicationSourceWALReader extends Thread { //returns false if we've already exceeded the global quota private boolean checkQuota() { // try not to go over total quota - if (source.manager.getTotalBufferUsed().get() > source.manager.getTotalBufferLimit()) { + if (source.controller.getTotalBufferUsed().get() > source.controller + .getTotalBufferLimit()) { LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B", - this.source.getPeerId(), source.manager.getTotalBufferUsed().get(), - source.manager.getTotalBufferLimit()); + this.source.getPeerId(), source.controller.getTotalBufferUsed().get(), + source.controller.getTotalBufferLimit()); Threads.sleep(sleepForRetries); return false; } @@ -400,10 +401,10 @@ class ReplicationSourceWALReader extends Thread { * @return true if we should clear buffer and push all */ private boolean acquireBufferQuota(long size) { - long newBufferUsed = source.manager.getTotalBufferUsed().addAndGet(size); + long newBufferUsed = source.controller.getTotalBufferUsed().addAndGet(size); // Record the new buffer usage - source.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); - return newBufferUsed >= source.manager.getTotalBufferLimit(); + source.controller.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); + return newBufferUsed >= source.controller.getTotalBufferLimit(); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index b75a7ed..66059c7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; import org.apache.hadoop.hbase.replication.regionserver.WALEntryBatch; import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -40,21 +39,21 @@ import org.apache.hadoop.hbase.wal.WAL.Entry; public class ReplicationSourceDummy implements ReplicationSourceInterface { private ReplicationPeer replicationPeer; - private String peerClusterId; + private String queueId; private Path currentPath; private MetricsSource metrics; private WALFileLengthProvider walFileLengthProvider; private AtomicBoolean startup = new AtomicBoolean(false); @Override - public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager, - ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId, - UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) - throws IOException { - this.peerClusterId = peerClusterId; + public void init(Configuration conf, FileSystem fs, Path walDir, + ReplicationSourceController overallController, ReplicationQueueStorage queueStorage, + ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, + WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { + this.queueId = queueId; this.metrics = metrics; this.walFileLengthProvider = walFileLengthProvider; - this.replicationPeer = rp; + this.replicationPeer = replicationPeer; } @Override @@ -96,14 +95,14 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { @Override public String getQueueId() { - return peerClusterId; + return queueId; } @Override public String getPeerId() { - String[] parts = peerClusterId.split("-", 2); + String[] parts = queueId.split("-", 2); return parts.length != 1 ? - parts[0] : peerClusterId; + parts[0] : queueId; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 4b685ce..0e0353f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -43,6 +43,7 @@ import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -72,6 +73,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationSourceController; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationUtils; @@ -816,10 +818,11 @@ public abstract class TestReplicationSourceManager { static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy { - @Override public void init(Configuration conf, FileSystem fs, Path walDir, - ReplicationSourceManager manager, ReplicationQueueStorage rq, ReplicationPeer rp, - Server server, String peerClusterId, UUID clusterId, - WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { + @Override + public void init(Configuration conf, FileSystem fs, Path walDir, + ReplicationSourceController overallController, ReplicationQueueStorage queueStorage, + ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, + WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException{ throw new IOException("Failing deliberately"); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index 9410604..bafabb0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; +import org.apache.hadoop.hbase.replication.ReplicationSourceController; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; @@ -375,19 +376,19 @@ public class TestWALEntryStream { when(source.getWALFileLengthProvider()).thenReturn(log); when(source.getServer()).thenReturn(mockServer); when(source.isRecovered()).thenReturn(recovered); - source.manager = mockReplicationSourceManager(); + source.controller = mockReplicationSourceController(); return source; } - private ReplicationSourceManager mockReplicationSourceManager() { - ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); + private ReplicationSourceController mockReplicationSourceController() { + ReplicationSourceController controller = Mockito.mock(ReplicationSourceController.class); MetricsReplicationGlobalSourceSource globalMetrics = Mockito.mock(MetricsReplicationGlobalSourceSource.class); - when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics); - when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); - when(mockSourceManager.getTotalBufferLimit()) + when(controller.getGlobalMetrics()).thenReturn(globalMetrics); + when(controller.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); + when(controller.getTotalBufferLimit()) .thenReturn((long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); - return mockSourceManager; + return controller; } private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) {