This is an automated email from the ASF dual-hosted git repository.

sunxin pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 4718d24699e8129c8b1fa4d0acf236137248d492
Author: Guanghao Zhang <zg...@apache.org>
AuthorDate: Sun Sep 20 09:02:53 2020 +0800

    HBASE-24998 Introduce a ReplicationSourceOverallController interface and 
decouple ReplicationSourceManager and ReplicationSource (#2364)
    
    Signed-off-by: meiyi <myime...@gmail.com>
---
 .../java/org/apache/hadoop/hbase/HConstants.java   |  2 +
 .../hadoop/hbase/regionserver/RSRpcServices.java   |  4 +-
 .../replication/ReplicationSourceController.java   | 32 +++++++++-----
 .../regionserver/RecoveredReplicationSource.java   | 18 ++++----
 .../regionserver/ReplicationSource.java            | 35 ++++++---------
 .../regionserver/ReplicationSourceInterface.java   | 25 +++++++----
 .../regionserver/ReplicationSourceManager.java     | 51 +++++++++++++---------
 .../regionserver/ReplicationSourceShipper.java     |  4 +-
 .../regionserver/ReplicationSourceWALReader.java   | 13 +++---
 .../hbase/replication/ReplicationSourceDummy.java  | 21 +++++----
 .../regionserver/TestBasicWALEntryStream.java      | 15 ++++---
 .../regionserver/TestReplicationSource.java        |  2 +-
 .../regionserver/TestReplicationSourceManager.java |  3 +-
 13 files changed, 125 insertions(+), 100 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 10a38f6..6cde48d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -994,6 +994,8 @@ public final class HConstants {
   /*
    * cluster replication constants.
    */
+  public static final String REPLICATION_OFFLOAD_ENABLE_KEY = 
"hbase.replication.offload.enabled";
+  public static final boolean REPLICATION_OFFLOAD_ENABLE_DEFAULT = false;
   public static final String
       REPLICATION_SOURCE_SERVICE_CLASSNAME = 
"hbase.replication.source.service";
   public static final String REPLICATION_SERVICE_CLASSNAME_DEFAULT =
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index c1f447c..72fea23 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -258,6 +258,8 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuo
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
@@ -271,7 +273,7 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe
 @SuppressWarnings("deprecation")
 public class RSRpcServices implements HBaseRPCErrorHandler,
     AdminService.BlockingInterface, ClientService.BlockingInterface, 
PriorityFunction,
-    ConfigurationObserver {
+    ConfigurationObserver, ReplicationServerService.BlockingInterface {
   private static final Logger LOG = 
LoggerFactory.getLogger(RSRpcServices.class);
 
   /** RPC scheduler to use for the region server. */
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceController.java
similarity index 50%
rename from 
hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
rename to 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceController.java
index 5c21e1e..5bb9dd6 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceController.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,21 +17,32 @@
  */
 package org.apache.hadoop.hbase.replication;
 
-import org.apache.hadoop.hbase.ServerName;
+import java.util.concurrent.atomic.AtomicLong;
+
+import 
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
+import 
org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
- * The replication listener interface can be implemented if a class needs to 
subscribe to events
- * generated by the ReplicationTracker. These events include things like 
addition/deletion of peer
- * clusters or failure of a local region server. To receive events, the class 
also needs to register
- * itself with a Replication Tracker.
+ * Used to control all replication sources inside one RegionServer or 
ReplicationServer.
+ * Used by {@link 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSource} or
+ * {@link RecoveredReplicationSource}.
  */
 @InterfaceAudience.Private
-public interface ReplicationListener {
+public interface ReplicationSourceController {
+
+  /**
+   * Returns the maximum size in bytes of edits held in memory which are 
pending replication
+   * across all sources inside this RegionServer or ReplicationServer.
+   */
+  long getTotalBufferLimit();
+
+  AtomicLong getTotalBufferUsed();
+
+  MetricsReplicationGlobalSourceSource getGlobalMetrics();
 
   /**
-   * A region server has been removed from the local cluster
-   * @param regionServer the removed region server
+   * Call this when the recovered replication source replicated all WALs.
    */
-  public void regionServerRemoved(ServerName regionServer);
+  void finishRecoveredSource(RecoveredReplicationSource src);
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index abbc046..7cb159e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -45,18 +46,15 @@ public class RecoveredReplicationSource extends 
ReplicationSource {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(RecoveredReplicationSource.class);
 
-  private Path walDir;
-
   private String actualPeerId;
 
   @Override
-  public void init(Configuration conf, FileSystem fs, Path walDir, 
ReplicationSourceManager manager,
-    ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, 
Server server,
-    String peerClusterZnode, UUID clusterId, WALFileLengthProvider 
walFileLengthProvider,
-    MetricsSource metrics) throws IOException {
-    super.init(conf, fs, walDir, manager, queueStorage, replicationPeer, 
server, peerClusterZnode,
-      clusterId, walFileLengthProvider, metrics);
-    this.walDir = walDir;
+  public void init(Configuration conf, FileSystem fs, Path walDir,
+    ReplicationSourceController overallController, ReplicationQueueStorage 
queueStorage,
+    ReplicationPeer replicationPeer, Server server, String peerClusterZnode, 
UUID clusterId,
+    WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws 
IOException {
+    super.init(conf, fs, walDir, overallController, queueStorage, 
replicationPeer, server,
+      peerClusterZnode, clusterId, walFileLengthProvider, metrics);
     this.actualPeerId = this.replicationQueueInfo.getPeerId();
   }
 
@@ -149,7 +147,7 @@ public class RecoveredReplicationSource extends 
ReplicationSource {
   void tryFinish() {
     if (workerThreads.isEmpty()) {
       this.getSourceMetrics().clear();
-      manager.finishRecoveredSource(this);
+      controller.finishRecoveredSource(this);
     }
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index cfcc837..27f2ce7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -58,6 +58,7 @@ import 
org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
@@ -96,8 +97,9 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
   protected Configuration conf;
   protected ReplicationQueueInfo replicationQueueInfo;
 
-  // The manager of all sources to which we ping back our progress
-  ReplicationSourceManager manager;
+  protected Path walDir;
+
+  protected ReplicationSourceController controller;
   // Should we stop everything?
   protected Server server;
   // How long should we sleep for each retry
@@ -181,23 +183,14 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     this.baseFilterOutWALEntries = 
Collections.unmodifiableList(baseFilterOutWALEntries);
   }
 
-  /**
-   * Instantiation method used by region servers
-   * @param conf configuration to use
-   * @param fs file system to use
-   * @param manager replication manager to ping to
-   * @param server the server for this region server
-   * @param queueId the id of our replication queue
-   * @param clusterId unique UUID for the cluster
-   * @param metrics metrics for replication source
-   */
   @Override
-  public void init(Configuration conf, FileSystem fs, Path walDir, 
ReplicationSourceManager manager,
-      ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, 
Server server,
-      String queueId, UUID clusterId, WALFileLengthProvider 
walFileLengthProvider,
-      MetricsSource metrics) throws IOException {
+  public void init(Configuration conf, FileSystem fs, Path walDir,
+    ReplicationSourceController overallController, ReplicationQueueStorage 
queueStorage,
+    ReplicationPeer replicationPeer, Server server, String queueId, UUID 
clusterId,
+    WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws 
IOException {
     this.server = server;
     this.conf = HBaseConfiguration.create(conf);
+    this.walDir = walDir;
     this.waitOnEndpointSeconds =
       this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, 
DEFAULT_WAIT_ON_ENDPOINT_SECONDS);
     decorateConf();
@@ -209,7 +202,7 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     this.logQueue = new ReplicationSourceLogQueue(conf, metrics, this);
     this.queueStorage = queueStorage;
     this.replicationPeer = replicationPeer;
-    this.manager = manager;
+    this.controller = overallController;
     this.fs = fs;
     this.metrics = metrics;
     this.clusterId = clusterId;
@@ -336,9 +329,9 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
         Threads.setDaemonThreadRunning(
             walReader, Thread.currentThread().getName()
             + ".replicationSource.wal-reader." + walGroupId + "," + queueId,
-          (t,e) -> this.uncaughtException(t, e, this.manager, 
this.getPeerId()));
+          (t,e) -> this.uncaughtException(t, e, null, this.getPeerId()));
         worker.setWALReader(walReader);
-        worker.startup((t,e) -> this.uncaughtException(t, e, this.manager, 
this.getPeerId()));
+        worker.startup((t,e) -> this.uncaughtException(t, e, null, 
this.getPeerId()));
         return worker;
       }
     });
@@ -766,9 +759,9 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
       throttler.addPushSize(batchSize);
     }
     totalReplicatedEdits.addAndGet(entries.size());
-    long newBufferUsed = manager.getTotalBufferUsed().addAndGet(-batchSize);
+    long newBufferUsed = controller.getTotalBufferUsed().addAndGet(-batchSize);
     // Record the new buffer usage
-    
this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
+    controller.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
   }
 
   @Override
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 77bba90..296bd27 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -42,14 +43,22 @@ public interface ReplicationSourceInterface {
   /**
    * Initializer for the source
    *
-   * @param conf   the configuration to use
-   * @param fs     the file system to use
-   * @param server the server for this region server
-   */
-  void init(Configuration conf, FileSystem fs, Path walDir, 
ReplicationSourceManager manager,
-    ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, 
Server server,
-    String queueId, UUID clusterId, WALFileLengthProvider 
walFileLengthProvider,
-    MetricsSource metrics) throws IOException;
+   * @param conf configuration to use
+   * @param fs file system to use
+   * @param walDir the directory where the WAL is located
+   * @param overallController the overall controller of all replication sources
+   * @param queueStorage the replication queue storage
+   * @param replicationPeer the replication peer
+   * @param server the server which start and run this replication source
+   * @param queueId the id of our replication queue
+   * @param clusterId unique UUID for the cluster
+   * @param walFileLengthProvider used to get the WAL length
+   * @param metrics metrics for this replication source
+   */
+  void init(Configuration conf, FileSystem fs, Path walDir,
+    ReplicationSourceController overallController, ReplicationQueueStorage 
queueStorage,
+    ReplicationPeer replicationPeer, Server server, String queueId, UUID 
clusterId,
+    WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws 
IOException;
 
   /**
    * Add a log to the list of logs to replicate
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index f502a65..b6cb087 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -56,6 +56,7 @@ import 
org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -93,7 +94,7 @@ import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
  * </ul>
  */
 @InterfaceAudience.Private
-public class ReplicationSourceManager {
+public class ReplicationSourceManager implements ReplicationSourceController {
   private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationSourceManager.class);
   // all the sources that read this RS's logs and every peer only has one 
replication source
   private final ConcurrentMap<String, ReplicationSourceInterface> sources;
@@ -134,12 +135,6 @@ public class ReplicationSourceManager {
 
   private AtomicLong totalBufferUsed = new AtomicLong();
 
-  // How long should we sleep for each retry when deleting remote wal files 
for sync replication
-  // peer.
-  private final long sleepForRetries;
-  // Maximum number of retries before taking bold actions when deleting remote 
wal files for sync
-  // replication peer.
-  private final int maxRetriesMultiplier;
   // Total buffer size on this RegionServer for holding batched edits to be 
shipped.
   private final long totalBufferLimit;
   private final MetricsReplicationGlobalSourceSource globalMetrics;
@@ -155,6 +150,12 @@ public class ReplicationSourceManager {
   AtomicReference<ReplicationSourceInterface> catalogReplicationSource = new 
AtomicReference<>();
 
   /**
+   * When enable replication offload, will not create replication source and 
only write WAL to
+   * replication queue storage. The replication source will be started by 
ReplicationServer.
+   */
+  private final boolean replicationOffload;
+
+  /**
    * Creates a replication manager and sets the watch on all the other 
registered region servers
    * @param queueStorage the interface for manipulating replication queues
    * @param conf the configuration to use
@@ -197,12 +198,11 @@ public class ReplicationSourceManager {
     this.latestPaths = new HashMap<>();
     this.replicationForBulkLoadDataEnabled = conf.getBoolean(
       HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, 
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
-    this.sleepForRetries = 
this.conf.getLong("replication.source.sync.sleepforretries", 1000);
-    this.maxRetriesMultiplier =
-      this.conf.getInt("replication.source.sync.maxretriesmultiplier", 60);
     this.totalBufferLimit = 
conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
         HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
     this.globalMetrics = globalMetrics;
+    this.replicationOffload = 
conf.getBoolean(HConstants.REPLICATION_OFFLOAD_ENABLE_KEY,
+      HConstants.REPLICATION_OFFLOAD_ENABLE_DEFAULT);
   }
 
   /**
@@ -338,7 +338,9 @@ public class ReplicationSourceManager {
     if (peerConfig.isSyncReplication()) {
       syncReplicationPeerMappingManager.add(peer.getId(), peerConfig);
     }
-    src.startup();
+    if (!replicationOffload) {
+      src.startup();
+    }
     return src;
   }
 
@@ -431,7 +433,9 @@ public class ReplicationSourceManager {
         .forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
     }
     LOG.info("Startup replication source for " + src.getPeerId());
-    src.startup();
+    if (!replicationOffload) {
+      src.startup();
+    }
 
     List<ReplicationSourceInterface> toStartup = new ArrayList<>();
     // synchronized on oldsources to avoid race with NodeFailoverWorker
@@ -454,8 +458,10 @@ public class ReplicationSourceManager {
         toStartup.add(recoveredReplicationSource);
       }
     }
-    for (ReplicationSourceInterface replicationSource : toStartup) {
-      replicationSource.startup();
+    if (!replicationOffload) {
+      for (ReplicationSourceInterface replicationSource : toStartup) {
+        replicationSource.startup();
+      }
     }
   }
 
@@ -473,7 +479,8 @@ public class ReplicationSourceManager {
     return true;
   }
 
-  void finishRecoveredSource(ReplicationSourceInterface src) {
+  @Override
+  public void finishRecoveredSource(RecoveredReplicationSource src) {
     synchronized (oldsources) {
       if (!removeRecoveredSource(src)) {
         return;
@@ -487,8 +494,7 @@ public class ReplicationSourceManager {
    * Clear the metrics and related replication queue of the specified old 
source
    * @param src source to clear
    */
-  void removeSource(ReplicationSourceInterface src) {
-    LOG.info("Done with the queue " + src.getQueueId());
+  private void removeSource(ReplicationSourceInterface src) {
     this.sources.remove(src.getPeerId());
     // Delete queue from storage and memory
     deleteQueue(src.getQueueId());
@@ -532,7 +538,7 @@ public class ReplicationSourceManager {
     }
   }
 
-  // public because of we call it in TestReplicationEmptyWALRecovery
+  @InterfaceAudience.Private
   public void preLogRoll(Path newLog) throws IOException {
     String logName = newLog.getName();
     String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
@@ -550,8 +556,8 @@ public class ReplicationSourceManager {
     }
   }
 
-  // public because of we call it in TestReplicationEmptyWALRecovery
-  public void postLogRoll(Path newLog) throws IOException {
+  @InterfaceAudience.Private
+  public void postLogRoll(Path newLog) {
     // This only updates the sources we own, not the recovered ones
     for (ReplicationSourceInterface source : this.sources.values()) {
       source.enqueueLog(newLog);
@@ -758,6 +764,7 @@ public class ReplicationSourceManager {
     }
   }
 
+  @Override
   public AtomicLong getTotalBufferUsed() {
     return totalBufferUsed;
   }
@@ -766,6 +773,7 @@ public class ReplicationSourceManager {
    * Returns the maximum size in bytes of edits held in memory which are 
pending replication
    * across all sources inside this RegionServer.
    */
+  @Override
   public long getTotalBufferLimit() {
     return totalBufferLimit;
   }
@@ -856,7 +864,8 @@ public class ReplicationSourceManager {
     return executor.getActiveCount();
   }
 
-  MetricsReplicationGlobalSourceSource getGlobalMetrics() {
+  @Override
+  public MetricsReplicationGlobalSourceSource getGlobalMetrics() {
     return this.globalMetrics;
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 35c4e54..b904af8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -368,8 +368,8 @@ public class ReplicationSourceShipper extends Thread {
       LOG.trace("Decrementing totalBufferUsed by {}B while stopping 
Replication WAL Readers.",
         totalToDecrement.longValue());
     }
-    long newBufferUsed = source.manager.getTotalBufferUsed()
+    long newBufferUsed = source.controller.getTotalBufferUsed()
       .addAndGet(-totalToDecrement.longValue());
-    
source.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
+    
source.controller.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
   }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index d148162..698fd1e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -318,10 +318,11 @@ class ReplicationSourceWALReader extends Thread {
   //returns false if we've already exceeded the global quota
   private boolean checkQuota() {
     // try not to go over total quota
-    if (source.manager.getTotalBufferUsed().get() > 
source.manager.getTotalBufferLimit()) {
+    if (source.controller.getTotalBufferUsed().get() > source.controller
+      .getTotalBufferLimit()) {
       LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B 
exceeds limit {}B",
-        this.source.getPeerId(), source.manager.getTotalBufferUsed().get(),
-        source.manager.getTotalBufferLimit());
+        this.source.getPeerId(), source.controller.getTotalBufferUsed().get(),
+        source.controller.getTotalBufferLimit());
       Threads.sleep(sleepForRetries);
       return false;
     }
@@ -449,10 +450,10 @@ class ReplicationSourceWALReader extends Thread {
    * @return true if we should clear buffer and push all
    */
   private boolean acquireBufferQuota(long size) {
-    long newBufferUsed = source.manager.getTotalBufferUsed().addAndGet(size);
+    long newBufferUsed = 
source.controller.getTotalBufferUsed().addAndGet(size);
     // Record the new buffer usage
-    
source.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
-    return newBufferUsed >= source.manager.getTotalBufferLimit();
+    
source.controller.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
+    return newBufferUsed >= source.controller.getTotalBufferLimit();
   }
 
   /**
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index 42445a6..8a32e94 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
 import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
-import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
 import org.apache.hadoop.hbase.replication.regionserver.WALEntryBatch;
 import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -40,21 +39,21 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
 public class ReplicationSourceDummy implements ReplicationSourceInterface {
 
   private ReplicationPeer replicationPeer;
-  private String peerClusterId;
+  private String queueId;
   private Path currentPath;
   private MetricsSource metrics;
   private WALFileLengthProvider walFileLengthProvider;
   private AtomicBoolean startup = new AtomicBoolean(false);
 
   @Override
-  public void init(Configuration conf, FileSystem fs, Path walDir, 
ReplicationSourceManager manager,
-    ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String 
peerClusterId,
-    UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource 
metrics)
-    throws IOException {
-    this.peerClusterId = peerClusterId;
+  public void init(Configuration conf, FileSystem fs, Path walDir,
+    ReplicationSourceController overallController, ReplicationQueueStorage 
queueStorage,
+    ReplicationPeer replicationPeer, Server server, String queueId, UUID 
clusterId,
+    WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws 
IOException {
+    this.queueId = queueId;
     this.metrics = metrics;
     this.walFileLengthProvider = walFileLengthProvider;
-    this.replicationPeer = rp;
+    this.replicationPeer = replicationPeer;
   }
 
   @Override
@@ -97,14 +96,14 @@ public class ReplicationSourceDummy implements 
ReplicationSourceInterface {
 
   @Override
   public String getQueueId() {
-    return peerClusterId;
+    return queueId;
   }
 
   @Override
   public String getPeerId() {
-    String[] parts = peerClusterId.split("-", 2);
+    String[] parts = queueId.split("-", 2);
     return parts.length != 1 ?
-        parts[0] : peerClusterId;
+        parts[0] : queueId;
   }
 
   @Override
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
index 7402d82..616defa 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
 import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -271,19 +272,19 @@ public abstract class TestBasicWALEntryStream extends 
WALEntryStreamTestBase {
     when(source.getWALFileLengthProvider()).thenReturn(log);
     when(source.getServer()).thenReturn(mockServer);
     when(source.isRecovered()).thenReturn(recovered);
-    source.manager = mockReplicationSourceManager();
+    source.controller = mockReplicationSourceController();
     return source;
   }
 
-  private ReplicationSourceManager mockReplicationSourceManager() {
-    ReplicationSourceManager mockSourceManager = 
Mockito.mock(ReplicationSourceManager.class);
+  private ReplicationSourceController mockReplicationSourceController() {
+    ReplicationSourceController controller = 
Mockito.mock(ReplicationSourceController.class);
     MetricsReplicationGlobalSourceSource globalMetrics =
       Mockito.mock(MetricsReplicationGlobalSourceSource.class);
-    when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
-    when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
-    when(mockSourceManager.getTotalBufferLimit())
+    when(controller.getGlobalMetrics()).thenReturn(globalMetrics);
+    when(controller.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+    when(controller.getTotalBufferLimit())
       .thenReturn((long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
-    return mockSourceManager;
+    return controller;
   }
 
   private ReplicationSourceWALReader createReader(boolean recovered, 
Configuration conf) {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index 0309731..697a5ec 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -315,7 +315,7 @@ public class TestReplicationSource {
     reader.addEntryToBatch(batch, mockEntry);
     reader.entryBatchQueue.put(batch);
     source.terminate("test");
-    assertEquals(0, source.manager.getTotalBufferUsed().get());
+    assertEquals(0, source.controller.getTotalBufferUsed().get());
   }
 
   /**
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index b74b76e..44914a5 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
 import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
@@ -818,7 +819,7 @@ public abstract class TestReplicationSourceManager {
 
     @Override
     public void init(Configuration conf, FileSystem fs, Path walDir,
-      ReplicationSourceManager manager, ReplicationQueueStorage rq, 
ReplicationPeer rp,
+      ReplicationSourceController overallController, ReplicationQueueStorage 
rq, ReplicationPeer rp,
       Server server, String peerClusterId, UUID clusterId,
       WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) 
throws IOException {
       throw new IOException("Failing deliberately");

Reply via email to