HBASE-19636 All rs should already start work with the new peer change when 
replication peer procedure is finished

Signed-off-by: zhangduo <zhang...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/24eff344
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/24eff344
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/24eff344

Branch: refs/heads/HBASE-19397-branch-2
Commit: 24eff344ddc3541b7fa32401012a55861c432081
Parents: f42fc70
Author: Guanghao Zhang <zg...@apache.org>
Authored: Thu Jan 4 16:58:01 2018 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Mon Feb 26 15:58:44 2018 +0800

----------------------------------------------------------------------
 .../replication/ReplicationPeerConfig.java      |   1 -
 .../hbase/replication/ReplicationPeerImpl.java  |   4 +-
 .../hbase/replication/ReplicationQueueInfo.java |  23 +-
 .../hbase/replication/ReplicationUtils.java     |  56 ++
 .../replication/TestReplicationStateZKImpl.java |  21 -
 .../regionserver/ReplicationSourceService.java  |   3 +-
 .../regionserver/PeerProcedureHandler.java      |   3 +
 .../regionserver/PeerProcedureHandlerImpl.java  |  50 +-
 .../RecoveredReplicationSource.java             |   6 +-
 .../RecoveredReplicationSourceShipper.java      |   8 +-
 .../replication/regionserver/Replication.java   |  11 +-
 .../regionserver/ReplicationSource.java         |  34 +-
 .../regionserver/ReplicationSourceFactory.java  |   4 +-
 .../ReplicationSourceInterface.java             |   8 +-
 .../regionserver/ReplicationSourceManager.java  | 827 ++++++++++---------
 .../regionserver/ReplicationSourceShipper.java  |   6 +-
 .../ReplicationSourceWALReader.java             |   2 +-
 .../replication/ReplicationSourceDummy.java     |   2 +-
 .../replication/TestNamespaceReplication.java   |  57 +-
 .../TestReplicationSourceManager.java           |   5 +-
 20 files changed, 622 insertions(+), 509 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/24eff344/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
index fdae288..bf8d030 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.yetus.audience.InterfaceAudience;

http://git-wip-us.apache.org/repos/asf/hbase/blob/24eff344/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
index 3e17025..604e0bb 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -28,6 +27,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 
 @InterfaceAudience.Private
 public class ReplicationPeerImpl implements ReplicationPeer {
+
   private final Configuration conf;
 
   private final String id;

http://git-wip-us.apache.org/repos/asf/hbase/blob/24eff344/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
index ecd888f..cd65f9b 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.ServerName;
 
 /**
- * This class is responsible for the parsing logic for a znode representing a 
queue.
+ * This class is responsible for the parsing logic for a queue id representing 
a queue.
  * It will extract the peerId if it's recovered as well as the dead region 
servers
  * that were part of the queue's history.
  */
@@ -38,21 +38,20 @@ public class ReplicationQueueInfo {
   private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationQueueInfo.class);
 
   private final String peerId;
-  private final String peerClusterZnode;
+  private final String queueId;
   private boolean queueRecovered;
   // List of all the dead region servers that had this queue (if recovered)
   private List<ServerName> deadRegionServers = new ArrayList<>();
 
   /**
-   * The passed znode will be either the id of the peer cluster or
-   * the handling story of that queue in the form of id-servername-*
+   * The passed queueId will be either the id of the peer or the handling 
story of that queue
+   * in the form of id-servername-*
    */
-  public ReplicationQueueInfo(String znode) {
-    this.peerClusterZnode = znode;
-    String[] parts = znode.split("-", 2);
+  public ReplicationQueueInfo(String queueId) {
+    this.queueId = queueId;
+    String[] parts = queueId.split("-", 2);
     this.queueRecovered = parts.length != 1;
-    this.peerId = this.queueRecovered ?
-        parts[0] : peerClusterZnode;
+    this.peerId = this.queueRecovered ? parts[0] : queueId;
     if (parts.length >= 2) {
       // extract dead servers
       extractDeadServersFromZNodeString(parts[1], this.deadRegionServers);
@@ -60,7 +59,7 @@ public class ReplicationQueueInfo {
   }
 
   /**
-   * Parse dead server names from znode string servername can contain "-" such 
as
+   * Parse dead server names from queue id. servername can contain "-" such as
    * "ip-10-46-221-101.ec2.internal", so we need skip some "-" during parsing 
for the following
    * cases: 2-ip-10-46-221-101.ec2.internal,52170,1364333181125-&lt;server 
name>-...
    */
@@ -119,8 +118,8 @@ public class ReplicationQueueInfo {
     return this.peerId;
   }
 
-  public String getPeerClusterZnode() {
-    return this.peerClusterZnode;
+  public String getQueueId() {
+    return this.queueId;
   }
 
   public boolean isQueueRecovered() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/24eff344/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index be70e6e..ca871ea 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -18,12 +18,16 @@
 package org.apache.hadoop.hbase.replication;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CompoundConfiguration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -76,4 +80,56 @@ public final class ReplicationUtils {
       queueStorage.removeReplicatorIfQueueIsEmpty(replicator);
     }
   }
+
+  private static boolean isCollectionEqual(Collection<String> c1, 
Collection<String> c2) {
+    if (c1 == null) {
+      return c2 == null;
+    }
+    if (c2 == null) {
+      return false;
+    }
+    return c1.size() == c2.size() && c1.containsAll(c2);
+  }
+
+  private static boolean isNamespacesEqual(Set<String> ns1, Set<String> ns2) {
+    return isCollectionEqual(ns1, ns2);
+  }
+
+  private static boolean isTableCFsEqual(Map<TableName, List<String>> 
tableCFs1,
+      Map<TableName, List<String>> tableCFs2) {
+    if (tableCFs1 == null) {
+      return tableCFs2 == null;
+    }
+    if (tableCFs2 == null) {
+      return false;
+    }
+    if (tableCFs1.size() != tableCFs2.size()) {
+      return false;
+    }
+    for (Map.Entry<TableName, List<String>> entry1 : tableCFs1.entrySet()) {
+      TableName table = entry1.getKey();
+      if (!tableCFs2.containsKey(table)) {
+        return false;
+      }
+      List<String> cfs1 = entry1.getValue();
+      List<String> cfs2 = tableCFs2.get(table);
+      if (!isCollectionEqual(cfs1, cfs2)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public static boolean isKeyConfigEqual(ReplicationPeerConfig rpc1, 
ReplicationPeerConfig rpc2) {
+    if (rpc1.replicateAllUserTables() != rpc2.replicateAllUserTables()) {
+      return false;
+    }
+    if (rpc1.replicateAllUserTables()) {
+      return isNamespacesEqual(rpc1.getExcludeNamespaces(), 
rpc2.getExcludeNamespaces()) &&
+        isTableCFsEqual(rpc1.getExcludeTableCFsMap(), 
rpc2.getExcludeTableCFsMap());
+    } else {
+      return isNamespacesEqual(rpc1.getNamespaces(), rpc2.getNamespaces()) &&
+        isTableCFsEqual(rpc1.getTableCFsMap(), rpc2.getTableCFsMap());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/24eff344/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
index 1830103..08178f4 100644
--- 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
+++ 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.replication;
 
 import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.ClusterId;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseZKTestingUtility;
@@ -38,8 +37,6 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.experimental.categories.Category;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @Category({ ReplicationTests.class, MediumTests.class })
 public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
@@ -48,8 +45,6 @@ public class TestReplicationStateZKImpl extends 
TestReplicationStateBasic {
   public static final HBaseClassTestRule CLASS_RULE =
       HBaseClassTestRule.forClass(TestReplicationStateZKImpl.class);
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(TestReplicationStateZKImpl.class);
-
   private static Configuration conf;
   private static HBaseZKTestingUtility utility;
   private static ZKWatcher zkw;
@@ -97,20 +92,4 @@ public class TestReplicationStateZKImpl extends 
TestReplicationStateBasic {
   public static void tearDownAfterClass() throws Exception {
     utility.shutdownMiniZKCluster();
   }
-
-  private static class WarnOnlyAbortable implements Abortable {
-
-    @Override
-    public void abort(String why, Throwable e) {
-      LOG.warn("TestReplicationStateZKImpl received abort, ignoring.  Reason: 
" + why);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(e.toString(), e);
-      }
-    }
-
-    @Override
-    public boolean isAborted() {
-      return false;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/24eff344/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
index 977dd15..23ba773 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/hbase/blob/24eff344/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
index b392985..65da9af 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
@@ -23,6 +23,9 @@ import java.io.IOException;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.yetus.audience.InterfaceAudience;
 
+/**
+ * A handler for modifying replication peer in peer procedures.
+ */
 @InterfaceAudience.Private
 public interface PeerProcedureHandler {
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/24eff344/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
index c09c6a0..ce8fdae 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
@@ -15,21 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
 import java.util.concurrent.locks.Lock;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.util.KeyLocker;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @InterfaceAudience.Private
 public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
-  private static final Logger LOG = 
LoggerFactory.getLogger(PeerProcedureHandlerImpl.class);
 
   private final ReplicationSourceManager replicationSourceManager;
   private final KeyLocker<String> peersLock = new KeyLocker<>();
@@ -39,7 +38,7 @@ public class PeerProcedureHandlerImpl implements 
PeerProcedureHandler {
   }
 
   @Override
-  public void addPeer(String peerId) throws ReplicationException, IOException {
+  public void addPeer(String peerId) throws IOException {
     Lock peerLock = peersLock.acquireLock(peerId);
     try {
       replicationSourceManager.addPeer(peerId);
@@ -49,7 +48,7 @@ public class PeerProcedureHandlerImpl implements 
PeerProcedureHandler {
   }
 
   @Override
-  public void removePeer(String peerId) throws ReplicationException, 
IOException {
+  public void removePeer(String peerId) throws IOException {
     Lock peerLock = peersLock.acquireLock(peerId);
     try {
       if (replicationSourceManager.getReplicationPeers().getPeer(peerId) != 
null) {
@@ -60,35 +59,50 @@ public class PeerProcedureHandlerImpl implements 
PeerProcedureHandler {
     }
   }
 
-  @Override
-  public void disablePeer(String peerId) throws ReplicationException, 
IOException {
+  private void refreshPeerState(String peerId) throws ReplicationException, 
IOException {
     PeerState newState;
     Lock peerLock = peersLock.acquireLock(peerId);
     try {
+      ReplicationPeerImpl peer = 
replicationSourceManager.getReplicationPeers().getPeer(peerId);
+      if (peer == null) {
+        throw new ReplicationException("Peer with id=" + peerId + " is not 
cached.");
+      }
+      PeerState oldState = peer.getPeerState();
       newState = 
replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
+      // RS need to start work with the new replication state change
+      if (oldState.equals(PeerState.ENABLED) && 
newState.equals(PeerState.DISABLED)) {
+        replicationSourceManager.refreshSources(peerId);
+      }
     } finally {
       peerLock.unlock();
     }
-    LOG.info("disable replication peer, id: {}, new state: {}", peerId, 
newState);
   }
 
   @Override
   public void enablePeer(String peerId) throws ReplicationException, 
IOException {
-    PeerState newState;
-    Lock peerLock = peersLock.acquireLock(peerId);
-    try {
-      newState = 
replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
-    } finally {
-      peerLock.unlock();
-    }
-    LOG.info("enable replication peer, id: {}, new state: {}", peerId, 
newState);
+    refreshPeerState(peerId);
+  }
+
+  @Override
+  public void disablePeer(String peerId) throws ReplicationException, 
IOException {
+    refreshPeerState(peerId);
   }
 
   @Override
   public void updatePeerConfig(String peerId) throws ReplicationException, 
IOException {
     Lock peerLock = peersLock.acquireLock(peerId);
     try {
-      replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId);
+      ReplicationPeerImpl peer = 
replicationSourceManager.getReplicationPeers().getPeer(peerId);
+      if (peer == null) {
+        throw new ReplicationException("Peer with id=" + peerId + " is not 
cached.");
+      }
+      ReplicationPeerConfig oldConfig = peer.getPeerConfig();
+      ReplicationPeerConfig newConfig =
+          
replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId);
+      // RS need to start work with the new replication config change
+      if (!ReplicationUtils.isKeyConfigEqual(oldConfig, newConfig)) {
+        replicationSourceManager.refreshSources(peerId);
+      }
     } finally {
       peerLock.unlock();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/24eff344/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index 7bceb78..1be9a88 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -81,7 +81,7 @@ public class RecoveredReplicationSource extends 
ReplicationSource {
     ReplicationSourceWALReader walReader = new 
RecoveredReplicationSourceWALReader(fs,
         conf, queue, startPosition, walEntryFilter, this);
     Threads.setDaemonThreadRunning(walReader, threadName
-        + ".replicationSource.replicationWALReaderThread." + walGroupId + "," 
+ peerClusterZnode,
+        + ".replicationSource.replicationWALReaderThread." + walGroupId + "," 
+ queueId,
       getUncaughtExceptionHandler());
     return walReader;
   }
@@ -178,8 +178,8 @@ public class RecoveredReplicationSource extends 
ReplicationSource {
         }
       }
       if (allTasksDone) {
-        manager.closeRecoveredQueue(this);
-        LOG.info("Finished recovering queue " + peerClusterZnode + " with the 
following stats: "
+        manager.removeRecoveredSource(this);
+        LOG.info("Finished recovering queue " + queueId + " with the following 
stats: "
             + getStats());
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/24eff344/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
index 1c0bbee..38bbb48 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
@@ -76,7 +76,7 @@ public class RecoveredReplicationSourceShipper extends 
ReplicationSourceShipper
         shipEdits(entryBatch);
         if (entryBatch.getWalEntries().isEmpty()) {
           LOG.debug("Finished recovering queue for group " + walGroupId + " of 
peer "
-              + source.getPeerClusterZnode());
+              + source.getQueueId());
           source.getSourceMetrics().incrCompletedRecoveryQueue();
           setWorkerState(WorkerState.FINISHED);
           continue;
@@ -113,7 +113,7 @@ public class RecoveredReplicationSourceShipper extends 
ReplicationSourceShipper
   // normally has a position (unless the RS failed between 2 logs)
   private long getRecoveredQueueStartPos() {
     long startPosition = 0;
-    String peerClusterZnode = source.getPeerClusterZnode();
+    String peerClusterZnode = source.getQueueId();
     try {
       startPosition = 
this.replicationQueues.getWALPosition(source.getServerWALsBelongTo(),
         peerClusterZnode, this.queue.peek().getName());
@@ -129,8 +129,8 @@ public class RecoveredReplicationSourceShipper extends 
ReplicationSourceShipper
 
   @Override
   protected void updateLogPosition(long lastReadPosition) {
-    source.getSourceManager().logPositionAndCleanOldLogs(currentPath, 
source.getPeerClusterZnode(),
-      lastReadPosition, true, false);
+    source.getSourceManager().logPositionAndCleanOldLogs(currentPath, 
source.getQueueId(),
+      lastReadPosition, true);
     lastLoggedPosition = lastReadPosition;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/24eff344/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 5f8d0aa..6c46a85 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
 import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
-import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
@@ -187,11 +186,7 @@ public class Replication implements 
ReplicationSourceService, ReplicationSinkSer
    */
   @Override
   public void startReplicationService() throws IOException {
-    try {
-      this.replicationManager.init();
-    } catch (ReplicationException e) {
-      throw new IOException(e);
-    }
+    this.replicationManager.init();
     this.replicationSink = new ReplicationSink(this.conf, this.server);
     this.scheduleThreadPool.scheduleAtFixedRate(
       new ReplicationStatisticsTask(this.replicationSink, 
this.replicationManager),
@@ -210,9 +205,9 @@ public class Replication implements 
ReplicationSourceService, ReplicationSinkSer
       throws IOException {
     try {
       this.replicationManager.addHFileRefs(tableName, family, pairs);
-    } catch (ReplicationException e) {
+    } catch (IOException e) {
       LOG.error("Failed to add hfile references in the replication queue.", e);
-      throw new IOException(e);
+      throw e;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/24eff344/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index ffed88d..0092251 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -105,7 +105,7 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
   // total number of edits we replicated
   private AtomicLong totalReplicatedEdits = new AtomicLong(0);
   // The znode we currently play with
-  protected String peerClusterZnode;
+  protected String queueId;
   // Maximum number of retries before taking bold actions
   private int maxRetriesMultiplier;
   // Indicates if this particular source is running
@@ -141,14 +141,14 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
    * @param fs file system to use
    * @param manager replication manager to ping to
    * @param server the server for this region server
-   * @param peerClusterZnode the name of our znode
+   * @param queueId the id of our replication queue
    * @param clusterId unique UUID for the cluster
    * @param metrics metrics for replication source
    */
   @Override
   public void init(Configuration conf, FileSystem fs, ReplicationSourceManager 
manager,
       ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, 
Server server,
-      String peerClusterZnode, UUID clusterId, WALFileLengthProvider 
walFileLengthProvider,
+      String queueId, UUID clusterId, WALFileLengthProvider 
walFileLengthProvider,
       MetricsSource metrics) throws IOException {
     this.server = server;
     this.conf = HBaseConfiguration.create(conf);
@@ -167,8 +167,8 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
     this.metrics = metrics;
     this.clusterId = clusterId;
 
-    this.peerClusterZnode = peerClusterZnode;
-    this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
+    this.queueId = queueId;
+    this.replicationQueueInfo = new ReplicationQueueInfo(queueId);
     // ReplicationQueueInfo parses the peerId out of the znode for us
     this.peerId = this.replicationQueueInfo.getPeerId();
     this.logQueueWarnThreshold = 
this.conf.getInt("replication.source.log.queue.warn", 2);
@@ -178,7 +178,7 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
     this.throttler = new ReplicationThrottler((double) currentBandwidth / 
10.0);
     this.totalBufferUsed = manager.getTotalBufferUsed();
     this.walFileLengthProvider = walFileLengthProvider;
-    LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " 
+ peerId
+    LOG.info("queueId=" + queueId + ", ReplicationSource : " + peerId
         + ", currentBandwidth=" + this.currentBandwidth);
   }
 
@@ -216,12 +216,6 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
   @Override
   public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, 
Path>> pairs)
       throws ReplicationException {
-    String peerId = peerClusterZnode;
-    if (peerId.contains("-")) {
-      // peerClusterZnode will be in the form peerId + "-" + rsZNode.
-      // A peerId will not have "-" in its name, see HBASE-11394
-      peerId = peerClusterZnode.split("-")[0];
-    }
     Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
     if (tableCFMap != null) {
       List<String> tableCfs = tableCFMap.get(tableName);
@@ -310,7 +304,7 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
       this.terminate("ClusterId " + clusterId + " is replicating to itself: 
peerClusterId "
           + peerClusterId + " which is not allowed by ReplicationEndpoint:"
           + replicationEndpoint.getClass().getName(), null, false);
-      this.manager.closeQueue(this);
+      this.manager.removeSource(this);
       return;
     }
     LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
@@ -355,7 +349,7 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
     ReplicationSourceWALReader walReader =
         new ReplicationSourceWALReader(fs, conf, queue, startPosition, 
walEntryFilter, this);
     return (ReplicationSourceWALReader) 
Threads.setDaemonThreadRunning(walReader,
-      threadName + ".replicationSource.wal-reader." + walGroupId + "," + 
peerClusterZnode,
+      threadName + ".replicationSource.wal-reader." + walGroupId + "," + 
queueId,
       getUncaughtExceptionHandler());
   }
 
@@ -449,7 +443,7 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
         LOG.error("Unexpected exception in ReplicationSource", e);
       }
     };
-    Threads.setDaemonThreadRunning(this, n + ".replicationSource," + 
this.peerClusterZnode,
+    Threads.setDaemonThreadRunning(this, n + ".replicationSource," + 
this.queueId,
       handler);
   }
 
@@ -465,9 +459,9 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
 
   public void terminate(String reason, Exception cause, boolean join) {
     if (cause == null) {
-      LOG.info("Closing source " + this.peerClusterZnode + " because: " + 
reason);
+      LOG.info("Closing source " + this.queueId + " because: " + reason);
     } else {
-      LOG.error("Closing source " + this.peerClusterZnode + " because an error 
occurred: " + reason,
+      LOG.error("Closing source " + this.queueId + " because an error 
occurred: " + reason,
         cause);
     }
     this.sourceRunning = false;
@@ -491,7 +485,7 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
               .awaitTerminated(sleepForRetries * maxRetriesMultiplier, 
TimeUnit.MILLISECONDS);
         } catch (TimeoutException te) {
           LOG.warn("Got exception while waiting for endpoint to shutdown for 
replication source :"
-              + this.peerClusterZnode,
+              + this.queueId,
             te);
         }
       }
@@ -499,8 +493,8 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
   }
 
   @Override
-  public String getPeerClusterZnode() {
-    return this.peerClusterZnode;
+  public String getQueueId() {
+    return this.queueId;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/24eff344/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
index 865a202..93e8331 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
@@ -32,8 +32,8 @@ public class ReplicationSourceFactory {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationSourceFactory.class);
 
-  static ReplicationSourceInterface create(Configuration conf, String peerId) {
-    ReplicationQueueInfo replicationQueueInfo = new 
ReplicationQueueInfo(peerId);
+  static ReplicationSourceInterface create(Configuration conf, String queueId) 
{
+    ReplicationQueueInfo replicationQueueInfo = new 
ReplicationQueueInfo(queueId);
     boolean isQueueRecovered = replicationQueueInfo.isQueueRecovered();
     ReplicationSourceInterface src;
     try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/24eff344/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 4f10c73..d7cf9a3 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -51,7 +51,7 @@ public interface ReplicationSourceInterface {
    */
   void init(Configuration conf, FileSystem fs, ReplicationSourceManager 
manager,
       ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, 
Server server,
-      String peerClusterZnode, UUID clusterId, WALFileLengthProvider 
walFileLengthProvider,
+      String queueId, UUID clusterId, WALFileLengthProvider 
walFileLengthProvider,
       MetricsSource metrics) throws IOException;
 
   /**
@@ -96,11 +96,11 @@ public interface ReplicationSourceInterface {
   Path getCurrentPath();
 
   /**
-   * Get the id that the source is replicating to
+   * Get the queue id that the source is replicating to
    *
-   * @return peer cluster id
+   * @return queue id
    */
-  String getPeerClusterZnode();
+  String getQueueId();
 
   /**
    * Get the id that the source is replicating to.

http://git-wip-us.apache.org/repos/asf/hbase/blob/24eff344/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 968b3fb..6965f55 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
@@ -32,7 +30,7 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
@@ -66,27 +64,53 @@ import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
 import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
- * This class is responsible to manage all the replication
- * sources. There are two classes of sources:
+ * This class is responsible to manage all the replication sources. There are 
two classes of
+ * sources:
  * <ul>
- * <li> Normal sources are persistent and one per peer cluster</li>
- * <li> Old sources are recovered from a failed region server and our
- * only goal is to finish replicating the WAL queue it had up in ZK</li>
+ * <li>Normal sources are persistent and one per peer cluster</li>
+ * <li>Old sources are recovered from a failed region server and our only goal 
is to finish
+ * replicating the WAL queue it had</li>
+ * </ul>
+ * <p>
+ * When a region server dies, this class uses a watcher to get notified and it 
tries to grab a lock
+ * in order to transfer all the queues in a local old source.
+ * <p>
+ * Synchronization specification:
+ * <ul>
+ * <li>No need synchronized on {@link #sources}. {@link #sources} is a 
ConcurrentHashMap and there
+ * is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So there is no 
race for peer
+ * operations.</li>
+ * <li>Need synchronized on {@link #walsById}. There are four methods which 
modify it,
+ * {@link #addPeer(String)}, {@link #removePeer(String)},
+ * {@link #cleanOldLogs(SortedSet, String, String)} and {@link 
#preLogRoll(Path)}. {@link #walsById}
+ * is a ConcurrentHashMap and there is a Lock for peer id in {@link 
PeerProcedureHandlerImpl}. So
+ * there is no race between {@link #addPeer(String)} and {@link 
#removePeer(String)}.
+ * {@link #cleanOldLogs(SortedSet, String, String)} is called by {@link 
ReplicationSourceInterface}.
+ * So no race with {@link #addPeer(String)}. {@link #removePeer(String)} will 
terminate the
+ * {@link ReplicationSourceInterface} firstly, then remove the wals from 
{@link #walsById}. So no
+ * race with {@link #removePeer(String)}. The only case need synchronized is
+ * {@link #cleanOldLogs(SortedSet, String, String)} and {@link 
#preLogRoll(Path)}.</li>
+ * <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are 
three methods which
+ * modify it, {@link #removePeer(String)} , {@link #cleanOldLogs(SortedSet, 
String, String)} and
+ * {@link ReplicationSourceManager.NodeFailoverWorker#run()}.
+ * {@link #cleanOldLogs(SortedSet, String, String)} is called by {@link 
ReplicationSourceInterface}.
+ * {@link #removePeer(String)} will terminate the {@link 
ReplicationSourceInterface} firstly, then
+ * remove the wals from {@link #walsByIdRecoveredQueues}. And
+ * {@link ReplicationSourceManager.NodeFailoverWorker#run()} will add the wals 
to
+ * {@link #walsByIdRecoveredQueues} firstly, then start up a {@link 
ReplicationSourceInterface}. So
+ * there is no race here. For {@link 
ReplicationSourceManager.NodeFailoverWorker#run()} and
+ * {@link #removePeer(String)}, there is already synchronized on {@link 
#oldsources}. So no need
+ * synchronized on {@link #walsByIdRecoveredQueues}.</li>
+ * <li>Need synchronized on {@link #latestPaths} to avoid the new open source 
miss new log.</li>
+ * <li>Need synchronized on {@link #oldsources} to avoid adding recovered 
source for the
+ * to-be-removed peer.</li>
  * </ul>
- *
- * When a region server dies, this class uses a watcher to get notified and it
- * tries to grab a lock in order to transfer all the queues in a local
- * old source.
- *
- * This class implements the ReplicationListener interface so that it can 
track changes in
- * replication state.
  */
 @InterfaceAudience.Private
 public class ReplicationSourceManager implements ReplicationListener {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ReplicationSourceManager.class);
-  // List of all the sources that read this RS's logs
-  private final List<ReplicationSourceInterface> sources;
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationSourceManager.class);
+  // all the sources that read this RS's logs and every peer only has one 
replication source
+  private final ConcurrentMap<String, ReplicationSourceInterface> sources;
   // List of all the sources we got from died RSs
   private final List<ReplicationSourceInterface> oldsources;
   private final ReplicationQueueStorage queueStorage;
@@ -96,11 +120,16 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   private final UUID clusterId;
   // All about stopping
   private final Server server;
+
   // All logs we are currently tracking
-  // Index structure of the map is: peer_id->logPrefix/logGroup->logs
-  private final Map<String, Map<String, SortedSet<String>>> walsById;
+  // Index structure of the map is: queue_id->logPrefix/logGroup->logs
+  // For normal replication source, the peer id is same with the queue id
+  private final ConcurrentMap<String, Map<String, SortedSet<String>>> walsById;
   // Logs for recovered sources we are currently tracking
-  private final Map<String, Map<String, SortedSet<String>>> 
walsByIdRecoveredQueues;
+  // the map is: queue_id->logPrefix/logGroup->logs
+  // For recovered source, the queue id's format is peer_id-servername-*
+  private final ConcurrentMap<String, Map<String, SortedSet<String>>> 
walsByIdRecoveredQueues;
+
   private final Configuration conf;
   private final FileSystem fs;
   // The paths to the latest log of each wal group, for new coming peers
@@ -136,22 +165,22 @@ public class ReplicationSourceManager implements 
ReplicationListener {
       ReplicationPeers replicationPeers, ReplicationTracker 
replicationTracker, Configuration conf,
       Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID 
clusterId,
       WALFileLengthProvider walFileLengthProvider) throws IOException {
-    //CopyOnWriteArrayList is thread-safe.
-    //Generally, reading is more than modifying.
-    this.sources = new CopyOnWriteArrayList<>();
+    // CopyOnWriteArrayList is thread-safe.
+    // Generally, reading is more than modifying.
+    this.sources = new ConcurrentHashMap<>();
     this.queueStorage = queueStorage;
     this.replicationPeers = replicationPeers;
     this.replicationTracker = replicationTracker;
     this.server = server;
-    this.walsById = new HashMap<>();
+    this.walsById = new ConcurrentHashMap<>();
     this.walsByIdRecoveredQueues = new ConcurrentHashMap<>();
-    this.oldsources = new CopyOnWriteArrayList<>();
+    this.oldsources = new ArrayList<>();
     this.conf = conf;
     this.fs = fs;
     this.logDir = logDir;
     this.oldLogDir = oldLogDir;
-    this.sleepBeforeFailover =
-        conf.getLong("replication.sleep.before.failover", 30000); // 30 seconds
+    this.sleepBeforeFailover = 
conf.getLong("replication.sleep.before.failover", 30000); // 30
+                                                                               
          // seconds
     this.clusterId = clusterId;
     this.walFileLengthProvider = walFileLengthProvider;
     this.replicationTracker.registerListener(this);
@@ -160,8 +189,8 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     int nbWorkers = conf.getInt("replication.executor.workers", 1);
     // use a short 100ms sleep since this could be done inline with a RS 
startup
     // even if we fail, other region servers can take care of it
-    this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
-        100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
+    this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, 
TimeUnit.MILLISECONDS,
+        new LinkedBlockingQueue<>());
     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
     tfb.setNameFormat("ReplicationExecutor-%d");
     tfb.setDaemon(true);
@@ -171,74 +200,22 @@ public class ReplicationSourceManager implements 
ReplicationListener {
       HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
   }
 
-  @FunctionalInterface
-  private interface ReplicationQueueOperation {
-    void exec() throws ReplicationException;
-  }
-
-  private void abortWhenFail(ReplicationQueueOperation op) {
-    try {
-      op.exec();
-    } catch (ReplicationException e) {
-      server.abort("Failed to operate on replication queue", e);
-    }
-  }
-
   /**
-   * Provide the id of the peer and a log key and this method will figure which
-   * wal it belongs to and will log, for this region server, the current
-   * position. It will also clean old logs from the queue.
-   * @param log Path to the log currently being replicated from
-   * replication status in zookeeper. It will also delete older entries.
-   * @param id id of the peer cluster
-   * @param position current location in the log
-   * @param queueRecovered indicates if this queue comes from another region 
server
-   * @param holdLogInZK if true then the log is retained in ZK
-   */
-  public void logPositionAndCleanOldLogs(Path log, String id, long position, 
boolean queueRecovered,
-      boolean holdLogInZK) {
-    String fileName = log.getName();
-    abortWhenFail(
-      () -> this.queueStorage.setWALPosition(server.getServerName(), id, 
fileName, position));
-    if (holdLogInZK) {
-      return;
-    }
-    cleanOldLogs(fileName, id, queueRecovered);
-  }
-
-  /**
-   * Cleans a log file and all older files from ZK. Called when we are sure 
that a
-   * log file is closed and has no more entries.
-   * @param key Path to the log
-   * @param id id of the peer cluster
-   * @param queueRecovered Whether this is a recovered queue
+   * Adds a normal source per registered peer cluster and tries to process all 
old region server wal
+   * queues
+   * <p>
+   * The returned future is for adoptAbandonedQueues task.
    */
-  public void cleanOldLogs(String key, String id, boolean queueRecovered) {
-    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(key);
-    if (queueRecovered) {
-      SortedSet<String> wals = walsByIdRecoveredQueues.get(id).get(logPrefix);
-      if (wals != null && !wals.first().equals(key)) {
-        cleanOldLogs(wals, key, id);
-      }
-    } else {
-      synchronized (this.walsById) {
-        SortedSet<String> wals = walsById.get(id).get(logPrefix);
-        if (wals != null && !wals.first().equals(key)) {
-          cleanOldLogs(wals, key, id);
-        }
+  Future<?> init() throws IOException {
+    for (String id : this.replicationPeers.getAllPeerIds()) {
+      addSource(id);
+      if (replicationForBulkLoadDataEnabled) {
+        // Check if peer exists in hfile-refs queue, if not add it. This can 
happen in the case
+        // when a peer was added before replication for bulk loaded data was 
enabled.
+        throwIOExceptionWhenFail(() -> 
this.queueStorage.addPeerToHFileRefs(id));
       }
     }
-  }
-
-  private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
-    SortedSet<String> walSet = wals.headSet(key);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
-    }
-    for (String wal : walSet) {
-      abortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), 
id, wal));
-    }
-    walSet.clear();
+    return this.executor.submit(this::adoptAbandonedQueues);
   }
 
   private void adoptAbandonedQueues() {
@@ -254,8 +231,8 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     }
     List<ServerName> otherRegionServers = 
replicationTracker.getListOfRegionServers().stream()
         .map(ServerName::valueOf).collect(Collectors.toList());
-    LOG.info("Current list of replicators: " + currentReplicators + " other 
RSs: "
-        + otherRegionServers);
+    LOG.info(
+      "Current list of replicators: " + currentReplicators + " other RSs: " + 
otherRegionServers);
 
     // Look if there's anything to process after a restart
     for (ServerName rs : currentReplicators) {
@@ -266,56 +243,112 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   }
 
   /**
-   * Adds a normal source per registered peer cluster and tries to process all 
old region server wal
-   * queues
-   * <p>
-   * The returned future is for adoptAbandonedQueues task.
+   * 1. Add peer to replicationPeers 2. Add the normal source and related 
replication queue 3. Add
+   * HFile Refs
+   * @param peerId the id of replication peer
    */
-  Future<?> init() throws IOException, ReplicationException {
-    for (String id : this.replicationPeers.getAllPeerIds()) {
-      addSource(id);
+  public void addPeer(String peerId) throws IOException {
+    boolean added = false;
+    try {
+      added = this.replicationPeers.addPeer(peerId);
+    } catch (ReplicationException e) {
+      throw new IOException(e);
+    }
+    if (added) {
+      addSource(peerId);
       if (replicationForBulkLoadDataEnabled) {
-        // Check if peer exists in hfile-refs queue, if not add it. This can 
happen in the case
-        // when a peer was added before replication for bulk loaded data was 
enabled.
-        this.queueStorage.addPeerToHFileRefs(id);
+        throwIOExceptionWhenFail(() -> 
this.queueStorage.addPeerToHFileRefs(peerId));
       }
     }
-    return this.executor.submit(this::adoptAbandonedQueues);
   }
 
   /**
-   * Add sources for the given peer cluster on this region server. For the 
newly added peer, we only
-   * need to enqueue the latest log of each wal group and do replication
-   * @param id the id of the peer cluster
+   * 1. Remove peer for replicationPeers 2. Remove all the recovered sources 
for the specified id
+   * and related replication queues 3. Remove the normal source and related 
replication queue 4.
+   * Remove HFile Refs
+   * @param peerId the id of the replication peer
+   */
+  public void removePeer(String peerId) {
+    replicationPeers.removePeer(peerId);
+    String terminateMessage = "Replication stream was removed by a user";
+    List<ReplicationSourceInterface> oldSourcesToDelete = new ArrayList<>();
+    // synchronized on oldsources to avoid adding recovered source for the 
to-be-removed peer
+    // see NodeFailoverWorker.run
+    synchronized (this.oldsources) {
+      // First close all the recovered sources for this peer
+      for (ReplicationSourceInterface src : oldsources) {
+        if (peerId.equals(src.getPeerId())) {
+          oldSourcesToDelete.add(src);
+        }
+      }
+      for (ReplicationSourceInterface src : oldSourcesToDelete) {
+        src.terminate(terminateMessage);
+        removeRecoveredSource(src);
+      }
+    }
+    LOG.info(
+      "Number of deleted recovered sources for " + peerId + ": " + 
oldSourcesToDelete.size());
+    // Now close the normal source for this peer
+    ReplicationSourceInterface srcToRemove = this.sources.get(peerId);
+    if (srcToRemove != null) {
+      srcToRemove.terminate(terminateMessage);
+      removeSource(srcToRemove);
+    } else {
+      // This only happened in unit test 
TestReplicationSourceManager#testPeerRemovalCleanup
+      // Delete queue from storage and memory and queue id is same with peer 
id for normal
+      // source
+      deleteQueue(peerId);
+      this.walsById.remove(peerId);
+    }
+
+    // Remove HFile Refs
+    abortWhenFail(() -> this.queueStorage.removePeerFromHFileRefs(peerId));
+  }
+
+  /**
+   * Factory method to create a replication source
+   * @param queueId the id of the replication queue
+   * @return the created source
+   */
+  private ReplicationSourceInterface createSource(String queueId, 
ReplicationPeer replicationPeer)
+      throws IOException {
+    ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, 
queueId);
+
+    MetricsSource metrics = new MetricsSource(queueId);
+    // init replication source
+    src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, 
clusterId,
+      walFileLengthProvider, metrics);
+    return src;
+  }
+
+  /**
+   * Add a normal source for the given peer on this region server. Meanwhile, 
add new replication
+   * queue to storage. For the newly added peer, we only need to enqueue the 
latest log of each wal
+   * group and do replication
+   * @param peerId the id of the replication peer
    * @return the source that was created
    */
   @VisibleForTesting
-  ReplicationSourceInterface addSource(String id) throws IOException, 
ReplicationException {
-    ReplicationPeer peer = replicationPeers.getPeer(id);
-    ReplicationSourceInterface src = getReplicationSource(id, peer);
-    synchronized (this.walsById) {
-      this.sources.add(src);
+  ReplicationSourceInterface addSource(String peerId) throws IOException {
+    ReplicationPeer peer = replicationPeers.getPeer(peerId);
+    ReplicationSourceInterface src = createSource(peerId, peer);
+    // synchronized on latestPaths to avoid missing the new log
+    synchronized (this.latestPaths) {
+      this.sources.put(peerId, src);
       Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
-      this.walsById.put(id, walsByGroup);
+      this.walsById.put(peerId, walsByGroup);
       // Add the latest wal to that source's queue
-      synchronized (latestPaths) {
-        if (this.latestPaths.size() > 0) {
-          for (Path logPath : latestPaths) {
-            String name = logPath.getName();
-            String walPrefix = 
AbstractFSWALProvider.getWALPrefixFromWALName(name);
-            SortedSet<String> logs = new TreeSet<>();
-            logs.add(name);
-            walsByGroup.put(walPrefix, logs);
-            try {
-              this.queueStorage.addWAL(server.getServerName(), id, name);
-            } catch (ReplicationException e) {
-              String message = "Cannot add log to queue when creating a new 
source, queueId=" + id +
-                ", filename=" + name;
-              server.stop(message);
-              throw e;
-            }
-            src.enqueueLog(logPath);
-          }
+      if (this.latestPaths.size() > 0) {
+        for (Path logPath : latestPaths) {
+          String name = logPath.getName();
+          String walPrefix = 
AbstractFSWALProvider.getWALPrefixFromWALName(name);
+          SortedSet<String> logs = new TreeSet<>();
+          logs.add(name);
+          walsByGroup.put(walPrefix, logs);
+          // Abort RS and throw exception to make add peer failed
+          abortAndThrowIOExceptionWhenFail(
+            () -> this.queueStorage.addWAL(server.getServerName(), peerId, 
name));
+          src.enqueueLog(logPath);
         }
       }
     }
@@ -323,89 +356,219 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     return src;
   }
 
-  @VisibleForTesting
-  int getSizeOfLatestPath() {
-    synchronized (latestPaths) {
-      return latestPaths.size();
-    }
-  }
-
   /**
-   * Delete a complete queue of wals associated with a peer cluster
-   * @param peerId Id of the peer cluster queue of wals to delete
+   * Close the previous replication sources of this peer id and open new 
sources to trigger the new
+   * replication state changes or new replication config changes. Here we 
don't need to change
+   * replication queue storage and only to enqueue all logs to the new 
replication source
+   * @param peerId the id of the replication peer
+   * @throws IOException
    */
-  public void deleteSource(String peerId, boolean closeConnection) {
-    abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), 
peerId));
-    if (closeConnection) {
-      this.replicationPeers.removePeer(peerId);
+  public void refreshSources(String peerId) throws IOException {
+    String terminateMessage = "Peer " + peerId +
+      " state or config changed. Will close the previous replication source 
and open a new one";
+    ReplicationPeer peer = replicationPeers.getPeer(peerId);
+    ReplicationSourceInterface src = createSource(peerId, peer);
+    // synchronized on latestPaths to avoid missing the new log
+    synchronized (this.latestPaths) {
+      ReplicationSourceInterface toRemove = this.sources.put(peerId, src);
+      if (toRemove != null) {
+        LOG.info("Terminate replication source for " + toRemove.getPeerId());
+        toRemove.terminate(terminateMessage);
+      }
+      for (SortedSet<String> walsByGroup : walsById.get(peerId).values()) {
+        walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
+      }
     }
-  }
+    LOG.info("Startup replication source for " + src.getPeerId());
+    src.startup();
 
-  /**
-   * Terminate the replication on this region server
-   */
-  public void join() {
-    this.executor.shutdown();
-    for (ReplicationSourceInterface source : this.sources) {
-      source.terminate("Region server is closing");
+    List<ReplicationSourceInterface> toStartup = new ArrayList<>();
+    // synchronized on oldsources to avoid race with NodeFailoverWorker
+    synchronized (this.oldsources) {
+      List<String> previousQueueIds = new ArrayList<>();
+      for (ReplicationSourceInterface oldSource : this.oldsources) {
+        if (oldSource.getPeerId().equals(peerId)) {
+          previousQueueIds.add(oldSource.getQueueId());
+          oldSource.terminate(terminateMessage);
+          this.oldsources.remove(oldSource);
+        }
+      }
+      for (String queueId : previousQueueIds) {
+        ReplicationSourceInterface replicationSource = createSource(queueId, 
peer);
+        this.oldsources.add(replicationSource);
+        for (SortedSet<String> walsByGroup : 
walsByIdRecoveredQueues.get(queueId).values()) {
+          walsByGroup.forEach(wal -> src.enqueueLog(new Path(wal)));
+        }
+        toStartup.add(replicationSource);
+      }
+    }
+    for (ReplicationSourceInterface replicationSource : oldsources) {
+      replicationSource.startup();
     }
   }
 
   /**
-   * Get a copy of the wals of the first source on this rs
-   * @return a sorted set of wal names
+   * Clear the metrics and related replication queue of the specified old 
source
+   * @param src source to clear
    */
-  @VisibleForTesting
-  Map<String, Map<String, SortedSet<String>>> getWALs() {
-    return Collections.unmodifiableMap(walsById);
+  void removeRecoveredSource(ReplicationSourceInterface src) {
+    LOG.info("Done with the recovered queue " + src.getQueueId());
+    src.getSourceMetrics().clear();
+    this.oldsources.remove(src);
+    // Delete queue from storage and memory
+    deleteQueue(src.getQueueId());
+    this.walsByIdRecoveredQueues.remove(src.getQueueId());
   }
 
   /**
-   * Get a copy of the wals of the recovered sources on this rs
-   * @return a sorted set of wal names
+   * Clear the metrics and related replication queue of the specified old 
source
+   * @param src source to clear
    */
-  @VisibleForTesting
-  Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() {
-    return Collections.unmodifiableMap(walsByIdRecoveredQueues);
+  void removeSource(ReplicationSourceInterface src) {
+    LOG.info("Done with the queue " + src.getQueueId());
+    src.getSourceMetrics().clear();
+    this.sources.remove(src.getPeerId());
+    // Delete queue from storage and memory
+    deleteQueue(src.getQueueId());
+    this.walsById.remove(src.getQueueId());
   }
 
   /**
-   * Get a list of all the normal sources of this rs
-   * @return lis of all sources
+   * Delete a complete queue of wals associated with a replication source
+   * @param queueId the id of replication queue to delete
    */
-  public List<ReplicationSourceInterface> getSources() {
-    return this.sources;
+  private void deleteQueue(String queueId) {
+    abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), 
queueId));
+  }
+
+  @FunctionalInterface
+  private interface ReplicationQueueOperation {
+    void exec() throws ReplicationException;
+  }
+
+  private void abortWhenFail(ReplicationQueueOperation op) {
+    try {
+      op.exec();
+    } catch (ReplicationException e) {
+      server.abort("Failed to operate on replication queue", e);
+    }
+  }
+
+  private void throwIOExceptionWhenFail(ReplicationQueueOperation op) throws 
IOException {
+    try {
+      op.exec();
+    } catch (ReplicationException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private void abortAndThrowIOExceptionWhenFail(ReplicationQueueOperation op) 
throws IOException {
+    try {
+      op.exec();
+    } catch (ReplicationException e) {
+      server.abort("Failed to operate on replication queue", e);
+      throw new IOException(e);
+    }
   }
 
   /**
-   * Get a list of all the old sources of this rs
-   * @return list of all old sources
+   * This method will log the current position to storage. And also clean old 
logs from the
+   * replication queue.
+   * @param log Path to the log currently being replicated
+   * @param queueId id of the replication queue
+   * @param position current location in the log
+   * @param queueRecovered indicates if this queue comes from another region 
server
    */
-  public List<ReplicationSourceInterface> getOldSources() {
-    return this.oldsources;
+  public void logPositionAndCleanOldLogs(Path log, String queueId, long 
position,
+      boolean queueRecovered) {
+    String fileName = log.getName();
+    abortWhenFail(
+      () -> this.queueStorage.setWALPosition(server.getServerName(), queueId, 
fileName, position));
+    cleanOldLogs(fileName, queueId, queueRecovered);
   }
 
   /**
-   * Get the normal source for a given peer
-   * @param peerId
-   * @return the normal source for the give peer if it exists, otherwise null.
+   * Cleans a log file and all older logs from replication queue. Called when 
we are sure that a log
+   * file is closed and has no more entries.
+   * @param log Path to the log
+   * @param queueId id of the replication queue
+   * @param queueRecovered Whether this is a recovered queue
    */
-  public ReplicationSourceInterface getSource(String peerId) {
-    return getSources().stream().filter(s -> 
s.getPeerId().equals(peerId)).findFirst().orElse(null);
+  @VisibleForTesting
+  void cleanOldLogs(String log, String queueId, boolean queueRecovered) {
+    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
+    if (queueRecovered) {
+      SortedSet<String> wals = 
walsByIdRecoveredQueues.get(queueId).get(logPrefix);
+      if (wals != null && !wals.first().equals(log)) {
+        cleanOldLogs(wals, log, queueId);
+      }
+    } else {
+      // synchronized on walsById to avoid race with preLogRoll
+      synchronized (this.walsById) {
+        SortedSet<String> wals = walsById.get(queueId).get(logPrefix);
+        if (wals != null && !wals.first().equals(log)) {
+          cleanOldLogs(wals, log, queueId);
+        }
+      }
+    }
   }
 
-  @VisibleForTesting
-  List<String> getAllQueues() throws ReplicationException {
-    return queueStorage.getAllQueues(server.getServerName());
+  private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
+    SortedSet<String> walSet = wals.headSet(key);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
+    }
+    for (String wal : walSet) {
+      abortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), 
id, wal));
+    }
+    walSet.clear();
   }
 
   // public because of we call it in TestReplicationEmptyWALRecovery
   @VisibleForTesting
   public void preLogRoll(Path newLog) throws IOException {
-    recordLog(newLog);
     String logName = newLog.getName();
     String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
-    synchronized (latestPaths) {
+    // synchronized on latestPaths to avoid the new open source miss the new 
log
+    synchronized (this.latestPaths) {
+      // Add log to queue storage
+      for (ReplicationSourceInterface source : this.sources.values()) {
+        // If record log to queue storage failed, abort RS and throw exception 
to make log roll
+        // failed
+        abortAndThrowIOExceptionWhenFail(
+          () -> this.queueStorage.addWAL(server.getServerName(), 
source.getQueueId(), logName));
+      }
+
+      // synchronized on walsById to avoid race with cleanOldLogs
+      synchronized (this.walsById) {
+        // Update walsById map
+        for (Map.Entry<String, Map<String, SortedSet<String>>> entry : 
this.walsById.entrySet()) {
+          String peerId = entry.getKey();
+          Map<String, SortedSet<String>> walsByPrefix = entry.getValue();
+          boolean existingPrefix = false;
+          for (Map.Entry<String, SortedSet<String>> walsEntry : 
walsByPrefix.entrySet()) {
+            SortedSet<String> wals = walsEntry.getValue();
+            if (this.sources.isEmpty()) {
+              // If there's no slaves, don't need to keep the old wals since
+              // we only consider the last one when a new slave comes in
+              wals.clear();
+            }
+            if (logPrefix.equals(walsEntry.getKey())) {
+              wals.add(logName);
+              existingPrefix = true;
+            }
+          }
+          if (!existingPrefix) {
+            // The new log belongs to a new group, add it into this peer
+            LOG.debug("Start tracking logs for wal group " + logPrefix + " for 
peer " + peerId);
+            SortedSet<String> wals = new TreeSet<>();
+            wals.add(logName);
+            walsByPrefix.put(logPrefix, wals);
+          }
+        }
+      }
+
+      // Add to latestPaths
       Iterator<Path> iterator = latestPaths.iterator();
       while (iterator.hasNext()) {
         Path path = iterator.next();
@@ -418,89 +581,23 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     }
   }
 
-  /**
-   * Check and enqueue the given log to the correct source. If there's still 
no source for the
-   * group to which the given log belongs, create one
-   * @param logPath the log path to check and enqueue
-   * @throws IOException
-   */
-  private void recordLog(Path logPath) throws IOException {
-    String logName = logPath.getName();
-    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
-    // update replication queues on ZK
-    // synchronize on replicationPeers to avoid adding source for the 
to-be-removed peer
-    synchronized (replicationPeers) {
-      for (String id : replicationPeers.getAllPeerIds()) {
-        try {
-          this.queueStorage.addWAL(server.getServerName(), id, logName);
-        } catch (ReplicationException e) {
-          throw new IOException("Cannot add log to replication queue"
-              + " when creating a new source, queueId=" + id + ", filename=" + 
logName, e);
-        }
-      }
-    }
-    // update walsById map
-    synchronized (walsById) {
-      for (Map.Entry<String, Map<String, SortedSet<String>>> entry : 
this.walsById.entrySet()) {
-        String peerId = entry.getKey();
-        Map<String, SortedSet<String>> walsByPrefix = entry.getValue();
-        boolean existingPrefix = false;
-        for (Map.Entry<String, SortedSet<String>> walsEntry : 
walsByPrefix.entrySet()) {
-          SortedSet<String> wals = walsEntry.getValue();
-          if (this.sources.isEmpty()) {
-            // If there's no slaves, don't need to keep the old wals since
-            // we only consider the last one when a new slave comes in
-            wals.clear();
-          }
-          if (logPrefix.equals(walsEntry.getKey())) {
-            wals.add(logName);
-            existingPrefix = true;
-          }
-        }
-        if (!existingPrefix) {
-          // The new log belongs to a new group, add it into this peer
-          LOG.debug("Start tracking logs for wal group " + logPrefix + " for 
peer " + peerId);
-          SortedSet<String> wals = new TreeSet<>();
-          wals.add(logName);
-          walsByPrefix.put(logPrefix, wals);
-        }
-      }
-    }
-  }
-
   // public because of we call it in TestReplicationEmptyWALRecovery
   @VisibleForTesting
   public void postLogRoll(Path newLog) throws IOException {
     // This only updates the sources we own, not the recovered ones
-    for (ReplicationSourceInterface source : this.sources) {
+    for (ReplicationSourceInterface source : this.sources.values()) {
       source.enqueueLog(newLog);
     }
   }
 
-  @VisibleForTesting
-  public AtomicLong getTotalBufferUsed() {
-    return totalBufferUsed;
-  }
-
-  /**
-   * Factory method to create a replication source
-   * @param peerId the id of the peer cluster
-   * @return the created source
-   */
-  private ReplicationSourceInterface getReplicationSource(String peerId,
-      ReplicationPeer replicationPeer) throws IOException {
-    ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, 
peerId);
-
-    MetricsSource metrics = new MetricsSource(peerId);
-    // init replication source
-    src.init(conf, fs, this, queueStorage, replicationPeer, server, peerId, 
clusterId,
-      walFileLengthProvider, metrics);
-    return src;
+  @Override
+  public void regionServerRemoved(String regionserver) {
+    transferQueues(ServerName.valueOf(regionserver));
   }
 
   /**
    * Transfer all the queues of the specified to this region server. First it 
tries to grab a lock
-   * and if it works it will move the znodes and finally will delete the old 
znodes.
+   * and if it works it will move the old queues and finally will delete the 
old queues.
    * <p>
    * It creates one old source for any type of source of the old rs.
    */
@@ -518,102 +615,8 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   }
 
   /**
-   * Clear the references to the specified old source
-   * @param src source to clear
-   */
-  public void closeRecoveredQueue(ReplicationSourceInterface src) {
-    LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
-    if (src instanceof ReplicationSource) {
-      ((ReplicationSource) src).getSourceMetrics().clear();
-    }
-    this.oldsources.remove(src);
-    deleteSource(src.getPeerClusterZnode(), false);
-    this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
-  }
-
-  /**
-   * Clear the references to the specified old source
-   * @param src source to clear
-   */
-  public void closeQueue(ReplicationSourceInterface src) {
-    LOG.info("Done with the queue " + src.getPeerClusterZnode());
-    src.getSourceMetrics().clear();
-    this.sources.remove(src);
-    deleteSource(src.getPeerClusterZnode(), true);
-    this.walsById.remove(src.getPeerClusterZnode());
-  }
-
-  public void addPeer(String id) throws ReplicationException, IOException {
-    LOG.info("Trying to add peer, peerId: " + id);
-    boolean added = this.replicationPeers.addPeer(id);
-    if (added) {
-      LOG.info("Peer " + id + " connected success, trying to start the 
replication source thread.");
-      addSource(id);
-      if (replicationForBulkLoadDataEnabled) {
-        this.queueStorage.addPeerToHFileRefs(id);
-      }
-    }
-  }
-
-  /**
-   * Thie method first deletes all the recovered sources for the specified
-   * id, then deletes the normal source (deleting all related data in ZK).
-   * @param id The id of the peer cluster
-   */
-  public void removePeer(String id) {
-    LOG.info("Closing the following queue " + id + ", currently have "
-        + sources.size() + " and another "
-        + oldsources.size() + " that were recovered");
-    String terminateMessage = "Replication stream was removed by a user";
-    List<ReplicationSourceInterface> oldSourcesToDelete = new ArrayList<>();
-    // synchronized on oldsources to avoid adding recovered source for the 
to-be-removed peer
-    // see NodeFailoverWorker.run
-    synchronized (oldsources) {
-      // First close all the recovered sources for this peer
-      for (ReplicationSourceInterface src : oldsources) {
-        if (id.equals(src.getPeerId())) {
-          oldSourcesToDelete.add(src);
-        }
-      }
-      for (ReplicationSourceInterface src : oldSourcesToDelete) {
-        src.terminate(terminateMessage);
-        closeRecoveredQueue(src);
-      }
-    }
-    LOG.info("Number of deleted recovered sources for " + id + ": "
-        + oldSourcesToDelete.size());
-    // Now look for the one on this cluster
-    List<ReplicationSourceInterface> srcToRemove = new ArrayList<>();
-    // synchronize on replicationPeers to avoid adding source for the 
to-be-removed peer
-    synchronized (this.replicationPeers) {
-      for (ReplicationSourceInterface src : this.sources) {
-        if (id.equals(src.getPeerId())) {
-          srcToRemove.add(src);
-        }
-      }
-      if (srcToRemove.isEmpty()) {
-        LOG.error("The peer we wanted to remove is missing a 
ReplicationSourceInterface. " +
-            "This could mean that ReplicationSourceInterface initialization 
failed for this peer " +
-            "and that replication on this peer may not be caught up. peerId=" 
+ id);
-      }
-      for (ReplicationSourceInterface toRemove : srcToRemove) {
-        toRemove.terminate(terminateMessage);
-        closeQueue(toRemove);
-      }
-      deleteSource(id, true);
-    }
-    // Remove HFile Refs znode from zookeeper
-    abortWhenFail(() -> this.queueStorage.removePeerFromHFileRefs(id));
-  }
-
-  @Override
-  public void regionServerRemoved(String regionserver) {
-    transferQueues(ServerName.valueOf(regionserver));
-  }
-
-  /**
-   * Class responsible to setup new ReplicationSources to take care of the
-   * queues from dead region servers.
+   * Class responsible to setup new ReplicationSources to take care of the 
queues from dead region
+   * servers.
    */
   class NodeFailoverWorker extends Thread {
 
@@ -643,10 +646,10 @@ public class ReplicationSourceManager implements 
ReplicationListener {
       }
       Map<String, Set<String>> newQueues = new HashMap<>();
       try {
-        List<String> peers = queueStorage.getAllQueues(deadRS);
-        while (!peers.isEmpty()) {
+        List<String> queues = queueStorage.getAllQueues(deadRS);
+        while (!queues.isEmpty()) {
           Pair<String, SortedSet<String>> peer = 
queueStorage.claimQueue(deadRS,
-            peers.get(ThreadLocalRandom.current().nextInt(peers.size())), 
server.getServerName());
+            queues.get(ThreadLocalRandom.current().nextInt(queues.size())), 
server.getServerName());
           long sleep = sleepBeforeFailover / 2;
           if (!peer.getSecond().isEmpty()) {
             newQueues.put(peer.getFirst(), peer.getSecond());
@@ -658,9 +661,9 @@ public class ReplicationSourceManager implements 
ReplicationListener {
             LOG.warn("Interrupted while waiting before transferring a queue.");
             Thread.currentThread().interrupt();
           }
-          peers = queueStorage.getAllQueues(deadRS);
+          queues = queueStorage.getAllQueues(deadRS);
         }
-        if (!peers.isEmpty()) {
+        if (queues.isEmpty()) {
           queueStorage.removeReplicatorIfQueueIsEmpty(deadRS);
         }
       } catch (ReplicationException e) {
@@ -675,18 +678,18 @@ public class ReplicationSourceManager implements 
ReplicationListener {
       }
 
       for (Map.Entry<String, Set<String>> entry : newQueues.entrySet()) {
-        String peerId = entry.getKey();
+        String queueId = entry.getKey();
         Set<String> walsSet = entry.getValue();
         try {
           // there is not an actual peer defined corresponding to peerId for 
the failover.
-          ReplicationQueueInfo replicationQueueInfo = new 
ReplicationQueueInfo(peerId);
+          ReplicationQueueInfo replicationQueueInfo = new 
ReplicationQueueInfo(queueId);
           String actualPeerId = replicationQueueInfo.getPeerId();
 
           ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
           if (peer == null) {
-            LOG.warn("Skipping failover for peer:" + actualPeerId + " of node 
" + deadRS
-                + ", peer is null");
-            abortWhenFail(() -> 
queueStorage.removeQueue(server.getServerName(), peerId));
+            LOG.warn("Skipping failover for peer:" + actualPeerId + " of node 
" + deadRS +
+              ", peer is null");
+            abortWhenFail(() -> 
queueStorage.removeQueue(server.getServerName(), queueId));
             continue;
           }
           if (server instanceof ReplicationSyncUp.DummyServer
@@ -698,7 +701,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
           }
           // track sources in walsByIdRecoveredQueues
           Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
-          walsByIdRecoveredQueues.put(peerId, walsByGroup);
+          walsByIdRecoveredQueues.put(queueId, walsByGroup);
           for (String wal : walsSet) {
             String walPrefix = 
AbstractFSWALProvider.getWALPrefixFromWALName(wal);
             SortedSet<String> wals = walsByGroup.get(walPrefix);
@@ -709,14 +712,12 @@ public class ReplicationSourceManager implements 
ReplicationListener {
             wals.add(wal);
           }
 
-          // enqueue sources
-          ReplicationSourceInterface src = getReplicationSource(peerId, peer);
+          ReplicationSourceInterface src = createSource(queueId, peer);
           // synchronized on oldsources to avoid adding recovered source for 
the to-be-removed peer
-          // see removePeer
           synchronized (oldsources) {
             if (!replicationPeers.getAllPeerIds().contains(src.getPeerId())) {
               src.terminate("Recovered queue doesn't belong to any current 
peer");
-              closeRecoveredQueue(src);
+              removeRecoveredSource(src);
               continue;
             }
             oldsources.add(src);
@@ -734,6 +735,82 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   }
 
   /**
+   * Terminate the replication on this region server
+   */
+  public void join() {
+    this.executor.shutdown();
+    for (ReplicationSourceInterface source : this.sources.values()) {
+      source.terminate("Region server is closing");
+    }
+  }
+
+  /**
+   * Get a copy of the wals of the normal sources on this rs
+   * @return a sorted set of wal names
+   */
+  @VisibleForTesting
+  Map<String, Map<String, SortedSet<String>>> getWALs() {
+    return Collections.unmodifiableMap(walsById);
+  }
+
+  /**
+   * Get a copy of the wals of the recovered sources on this rs
+   * @return a sorted set of wal names
+   */
+  @VisibleForTesting
+  Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() {
+    return Collections.unmodifiableMap(walsByIdRecoveredQueues);
+  }
+
+  /**
+   * Get a list of all the normal sources of this rs
+   * @return list of all normal sources
+   */
+  public List<ReplicationSourceInterface> getSources() {
+    return new ArrayList<>(this.sources.values());
+  }
+
+  /**
+   * Get a list of all the recovered sources of this rs
+   * @return list of all recovered sources
+   */
+  public List<ReplicationSourceInterface> getOldSources() {
+    return this.oldsources;
+  }
+
+  /**
+   * Get the normal source for a given peer
+   * @return the normal source for the give peer if it exists, otherwise null.
+   */
+  @VisibleForTesting
+  public ReplicationSourceInterface getSource(String peerId) {
+    return this.sources.get(peerId);
+  }
+
+  @VisibleForTesting
+  List<String> getAllQueues() throws IOException {
+    List<String> allQueues = Collections.emptyList();
+    try {
+      allQueues = queueStorage.getAllQueues(server.getServerName());
+    } catch (ReplicationException e) {
+      throw new IOException(e);
+    }
+    return allQueues;
+  }
+
+  @VisibleForTesting
+  int getSizeOfLatestPath() {
+    synchronized (latestPaths) {
+      return latestPaths.size();
+    }
+  }
+
+  @VisibleForTesting
+  public AtomicLong getTotalBufferUsed() {
+    return totalBufferUsed;
+  }
+
+  /**
    * Get the directory where wals are archived
    * @return the directory where wals are archived
    */
@@ -761,28 +838,30 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    * Get the ReplicationPeers used by this ReplicationSourceManager
    * @return the ReplicationPeers used by this ReplicationSourceManager
    */
-  public ReplicationPeers getReplicationPeers() {return this.replicationPeers;}
+  public ReplicationPeers getReplicationPeers() {
+    return this.replicationPeers;
+  }
 
   /**
    * Get a string representation of all the sources' metrics
    */
   public String getStats() {
     StringBuilder stats = new StringBuilder();
-    for (ReplicationSourceInterface source : sources) {
+    for (ReplicationSourceInterface source : this.sources.values()) {
       stats.append("Normal source for cluster " + source.getPeerId() + ": ");
       stats.append(source.getStats() + "\n");
     }
     for (ReplicationSourceInterface oldSource : oldsources) {
-      stats.append("Recovered source for cluster/machine(s) " + 
oldSource.getPeerId()+": ");
-      stats.append(oldSource.getStats()+ "\n");
+      stats.append("Recovered source for cluster/machine(s) " + 
oldSource.getPeerId() + ": ");
+      stats.append(oldSource.getStats() + "\n");
     }
     return stats.toString();
   }
 
   public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, 
Path>> pairs)
-      throws ReplicationException {
-    for (ReplicationSourceInterface source : this.sources) {
-      source.addHFileRefs(tableName, family, pairs);
+      throws IOException {
+    for (ReplicationSourceInterface source : this.sources.values()) {
+      throwIOExceptionWhenFail(() -> source.addHFileRefs(tableName, family, 
pairs));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/24eff344/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index ced2980..959f676 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -223,15 +223,15 @@ public class ReplicationSourceShipper extends Thread {
   }
 
   protected void updateLogPosition(long lastReadPosition) {
-    source.getSourceManager().logPositionAndCleanOldLogs(currentPath, 
source.getPeerClusterZnode(),
-      lastReadPosition, false, false);
+    source.getSourceManager().logPositionAndCleanOldLogs(currentPath, 
source.getQueueId(),
+      lastReadPosition, false);
     lastLoggedPosition = lastReadPosition;
   }
 
   public void startup(UncaughtExceptionHandler handler) {
     String name = Thread.currentThread().getName();
     Threads.setDaemonThreadRunning(this, name + ".replicationSource." + 
walGroupId + ","
-        + source.getPeerClusterZnode(), handler);
+        + source.getQueueId(), handler);
   }
 
   public PriorityBlockingQueue<Path> getLogQueue() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/24eff344/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index b6b50ad..579d20f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -111,7 +111,7 @@ public class ReplicationSourceWALReader extends Thread {
         this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 
minutes @ 1 sec per
     this.eofAutoRecovery = 
conf.getBoolean("replication.source.eof.autorecovery", false);
     this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
-    LOG.info("peerClusterZnode=" + source.getPeerClusterZnode()
+    LOG.info("peerClusterZnode=" + source.getQueueId()
         + ", ReplicationSourceWALReaderThread : " + source.getPeerId()
         + " inited, replicationBatchSizeCapacity=" + 
replicationBatchSizeCapacity
         + ", replicationBatchCountCapacity=" + replicationBatchCountCapacity

http://git-wip-us.apache.org/repos/asf/hbase/blob/24eff344/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index 38ec598..ff20ddc 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -89,7 +89,7 @@ public class ReplicationSourceDummy implements 
ReplicationSourceInterface {
   }
 
   @Override
-  public String getPeerClusterZnode() {
+  public String getQueueId() {
     return peerClusterId;
   }
 

Reply via email to