HBASE-19636 All rs should already start work with the new peer change when 
replication peer procedure is finished

Signed-off-by: zhangduo <zhang...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/69fac916
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/69fac916
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/69fac916

Branch: refs/heads/HBASE-19397-branch-2
Commit: 69fac916567cf40f0018faea2471eb93a8a827b9
Parents: 83e6522
Author: Guanghao Zhang <zg...@apache.org>
Authored: Thu Jan 4 16:58:01 2018 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Thu Jan 11 17:11:23 2018 +0800

----------------------------------------------------------------------
 .../replication/ReplicationPeerConfig.java      |   1 -
 .../hbase/replication/ReplicationPeerImpl.java  |   4 +-
 .../hbase/replication/ReplicationQueueInfo.java |  23 +-
 .../hbase/replication/ReplicationUtils.java     |  56 ++
 .../replication/TestReplicationStateZKImpl.java |  22 -
 .../regionserver/ReplicationSourceService.java  |   3 +-
 .../regionserver/PeerProcedureHandler.java      |   3 +
 .../regionserver/PeerProcedureHandlerImpl.java  |  50 +-
 .../RecoveredReplicationSource.java             |   6 +-
 .../RecoveredReplicationSourceShipper.java      |   8 +-
 .../replication/regionserver/Replication.java   |  15 +-
 .../regionserver/ReplicationSource.java         |  34 +-
 .../regionserver/ReplicationSourceFactory.java  |   4 +-
 .../ReplicationSourceInterface.java             |   8 +-
 .../regionserver/ReplicationSourceManager.java  | 895 ++++++++++---------
 .../regionserver/ReplicationSourceShipper.java  |   6 +-
 .../ReplicationSourceWALReader.java             |   2 +-
 .../replication/ReplicationSourceDummy.java     |   2 +-
 .../replication/TestNamespaceReplication.java   |  57 +-
 .../TestReplicationSourceManager.java           |  11 +-
 .../TestReplicationSourceManagerZkImpl.java     |   1 -
 21 files changed, 659 insertions(+), 552 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/69fac916/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
index fdae288..bf8d030 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.yetus.audience.InterfaceAudience;

http://git-wip-us.apache.org/repos/asf/hbase/blob/69fac916/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
index 3e17025..604e0bb 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -28,6 +27,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 
 @InterfaceAudience.Private
 public class ReplicationPeerImpl implements ReplicationPeer {
+
   private final Configuration conf;
 
   private final String id;

http://git-wip-us.apache.org/repos/asf/hbase/blob/69fac916/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
index ecd888f..cd65f9b 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.ServerName;
 
 /**
- * This class is responsible for the parsing logic for a znode representing a 
queue.
+ * This class is responsible for the parsing logic for a queue id representing 
a queue.
  * It will extract the peerId if it's recovered as well as the dead region 
servers
  * that were part of the queue's history.
  */
@@ -38,21 +38,20 @@ public class ReplicationQueueInfo {
   private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationQueueInfo.class);
 
   private final String peerId;
-  private final String peerClusterZnode;
+  private final String queueId;
   private boolean queueRecovered;
   // List of all the dead region servers that had this queue (if recovered)
   private List<ServerName> deadRegionServers = new ArrayList<>();
 
   /**
-   * The passed znode will be either the id of the peer cluster or
-   * the handling story of that queue in the form of id-servername-*
+   * The passed queueId will be either the id of the peer or the handling 
story of that queue
+   * in the form of id-servername-*
    */
-  public ReplicationQueueInfo(String znode) {
-    this.peerClusterZnode = znode;
-    String[] parts = znode.split("-", 2);
+  public ReplicationQueueInfo(String queueId) {
+    this.queueId = queueId;
+    String[] parts = queueId.split("-", 2);
     this.queueRecovered = parts.length != 1;
-    this.peerId = this.queueRecovered ?
-        parts[0] : peerClusterZnode;
+    this.peerId = this.queueRecovered ? parts[0] : queueId;
     if (parts.length >= 2) {
       // extract dead servers
       extractDeadServersFromZNodeString(parts[1], this.deadRegionServers);
@@ -60,7 +59,7 @@ public class ReplicationQueueInfo {
   }
 
   /**
-   * Parse dead server names from znode string servername can contain "-" such 
as
+   * Parse dead server names from queue id. servername can contain "-" such as
    * "ip-10-46-221-101.ec2.internal", so we need skip some "-" during parsing 
for the following
    * cases: 2-ip-10-46-221-101.ec2.internal,52170,1364333181125-&lt;server 
name>-...
    */
@@ -119,8 +118,8 @@ public class ReplicationQueueInfo {
     return this.peerId;
   }
 
-  public String getPeerClusterZnode() {
-    return this.peerClusterZnode;
+  public String getQueueId() {
+    return this.queueId;
   }
 
   public boolean isQueueRecovered() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/69fac916/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index 7b676ca..ebe68a7 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -18,11 +18,15 @@
 package org.apache.hadoop.hbase.replication;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CompoundConfiguration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -66,4 +70,56 @@ public final class ReplicationUtils {
       queueStorage.removeReplicatorIfQueueIsEmpty(replicator);
     }
   }
+
+  private static boolean isCollectionEqual(Collection<String> c1, 
Collection<String> c2) {
+    if (c1 == null) {
+      return c2 == null;
+    }
+    if (c2 == null) {
+      return false;
+    }
+    return c1.size() == c2.size() && c1.containsAll(c2);
+  }
+
+  private static boolean isNamespacesEqual(Set<String> ns1, Set<String> ns2) {
+    return isCollectionEqual(ns1, ns2);
+  }
+
+  private static boolean isTableCFsEqual(Map<TableName, List<String>> 
tableCFs1,
+      Map<TableName, List<String>> tableCFs2) {
+    if (tableCFs1 == null) {
+      return tableCFs2 == null;
+    }
+    if (tableCFs2 == null) {
+      return false;
+    }
+    if (tableCFs1.size() != tableCFs2.size()) {
+      return false;
+    }
+    for (Map.Entry<TableName, List<String>> entry1 : tableCFs1.entrySet()) {
+      TableName table = entry1.getKey();
+      if (!tableCFs2.containsKey(table)) {
+        return false;
+      }
+      List<String> cfs1 = entry1.getValue();
+      List<String> cfs2 = tableCFs2.get(table);
+      if (!isCollectionEqual(cfs1, cfs2)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public static boolean isKeyConfigEqual(ReplicationPeerConfig rpc1, 
ReplicationPeerConfig rpc2) {
+    if (rpc1.replicateAllUserTables() != rpc2.replicateAllUserTables()) {
+      return false;
+    }
+    if (rpc1.replicateAllUserTables()) {
+      return isNamespacesEqual(rpc1.getExcludeNamespaces(), 
rpc2.getExcludeNamespaces()) &&
+        isTableCFsEqual(rpc1.getExcludeTableCFsMap(), 
rpc2.getExcludeTableCFsMap());
+    } else {
+      return isNamespacesEqual(rpc1.getNamespaces(), rpc2.getNamespaces()) &&
+        isTableCFsEqual(rpc1.getTableCFsMap(), rpc2.getTableCFsMap());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/69fac916/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
index 6825c36..2790bd0 100644
--- 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
+++ 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
@@ -18,9 +18,7 @@
 package org.apache.hadoop.hbase.replication;
 
 import java.io.IOException;
-
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.ClusterId;
 import org.apache.hadoop.hbase.HBaseZKTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
@@ -37,14 +35,10 @@ import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.experimental.categories.Category;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @Category({ ReplicationTests.class, MediumTests.class })
 public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(TestReplicationStateZKImpl.class);
-
   private static Configuration conf;
   private static HBaseZKTestingUtility utility;
   private static ZKWatcher zkw;
@@ -92,20 +86,4 @@ public class TestReplicationStateZKImpl extends 
TestReplicationStateBasic {
   public static void tearDownAfterClass() throws Exception {
     utility.shutdownMiniZKCluster();
   }
-
-  private static class WarnOnlyAbortable implements Abortable {
-
-    @Override
-    public void abort(String why, Throwable e) {
-      LOG.warn("TestReplicationStateZKImpl received abort, ignoring.  Reason: 
" + why);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(e.toString(), e);
-      }
-    }
-
-    @Override
-    public boolean isAborted() {
-      return false;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/69fac916/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
index a82fa3d..2aef0a8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/hbase/blob/69fac916/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
index b392985..65da9af 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
@@ -23,6 +23,9 @@ import java.io.IOException;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.yetus.audience.InterfaceAudience;
 
+/**
+ * A handler for modifying replication peer in peer procedures.
+ */
 @InterfaceAudience.Private
 public interface PeerProcedureHandler {
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/69fac916/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
index c09c6a0..ce8fdae 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
@@ -15,21 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
 import java.util.concurrent.locks.Lock;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.util.KeyLocker;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @InterfaceAudience.Private
 public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
-  private static final Logger LOG = 
LoggerFactory.getLogger(PeerProcedureHandlerImpl.class);
 
   private final ReplicationSourceManager replicationSourceManager;
   private final KeyLocker<String> peersLock = new KeyLocker<>();
@@ -39,7 +38,7 @@ public class PeerProcedureHandlerImpl implements 
PeerProcedureHandler {
   }
 
   @Override
-  public void addPeer(String peerId) throws ReplicationException, IOException {
+  public void addPeer(String peerId) throws IOException {
     Lock peerLock = peersLock.acquireLock(peerId);
     try {
       replicationSourceManager.addPeer(peerId);
@@ -49,7 +48,7 @@ public class PeerProcedureHandlerImpl implements 
PeerProcedureHandler {
   }
 
   @Override
-  public void removePeer(String peerId) throws ReplicationException, 
IOException {
+  public void removePeer(String peerId) throws IOException {
     Lock peerLock = peersLock.acquireLock(peerId);
     try {
       if (replicationSourceManager.getReplicationPeers().getPeer(peerId) != 
null) {
@@ -60,35 +59,50 @@ public class PeerProcedureHandlerImpl implements 
PeerProcedureHandler {
     }
   }
 
-  @Override
-  public void disablePeer(String peerId) throws ReplicationException, 
IOException {
+  private void refreshPeerState(String peerId) throws ReplicationException, 
IOException {
     PeerState newState;
     Lock peerLock = peersLock.acquireLock(peerId);
     try {
+      ReplicationPeerImpl peer = 
replicationSourceManager.getReplicationPeers().getPeer(peerId);
+      if (peer == null) {
+        throw new ReplicationException("Peer with id=" + peerId + " is not 
cached.");
+      }
+      PeerState oldState = peer.getPeerState();
       newState = 
replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
+      // RS need to start work with the new replication state change
+      if (oldState.equals(PeerState.ENABLED) && 
newState.equals(PeerState.DISABLED)) {
+        replicationSourceManager.refreshSources(peerId);
+      }
     } finally {
       peerLock.unlock();
     }
-    LOG.info("disable replication peer, id: {}, new state: {}", peerId, 
newState);
   }
 
   @Override
   public void enablePeer(String peerId) throws ReplicationException, 
IOException {
-    PeerState newState;
-    Lock peerLock = peersLock.acquireLock(peerId);
-    try {
-      newState = 
replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
-    } finally {
-      peerLock.unlock();
-    }
-    LOG.info("enable replication peer, id: {}, new state: {}", peerId, 
newState);
+    refreshPeerState(peerId);
+  }
+
+  @Override
+  public void disablePeer(String peerId) throws ReplicationException, 
IOException {
+    refreshPeerState(peerId);
   }
 
   @Override
   public void updatePeerConfig(String peerId) throws ReplicationException, 
IOException {
     Lock peerLock = peersLock.acquireLock(peerId);
     try {
-      replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId);
+      ReplicationPeerImpl peer = 
replicationSourceManager.getReplicationPeers().getPeer(peerId);
+      if (peer == null) {
+        throw new ReplicationException("Peer with id=" + peerId + " is not 
cached.");
+      }
+      ReplicationPeerConfig oldConfig = peer.getPeerConfig();
+      ReplicationPeerConfig newConfig =
+          
replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId);
+      // RS need to start work with the new replication config change
+      if (!ReplicationUtils.isKeyConfigEqual(oldConfig, newConfig)) {
+        replicationSourceManager.refreshSources(peerId);
+      }
     } finally {
       peerLock.unlock();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/69fac916/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index 7bceb78..1be9a88 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -81,7 +81,7 @@ public class RecoveredReplicationSource extends 
ReplicationSource {
     ReplicationSourceWALReader walReader = new 
RecoveredReplicationSourceWALReader(fs,
         conf, queue, startPosition, walEntryFilter, this);
     Threads.setDaemonThreadRunning(walReader, threadName
-        + ".replicationSource.replicationWALReaderThread." + walGroupId + "," 
+ peerClusterZnode,
+        + ".replicationSource.replicationWALReaderThread." + walGroupId + "," 
+ queueId,
       getUncaughtExceptionHandler());
     return walReader;
   }
@@ -178,8 +178,8 @@ public class RecoveredReplicationSource extends 
ReplicationSource {
         }
       }
       if (allTasksDone) {
-        manager.closeRecoveredQueue(this);
-        LOG.info("Finished recovering queue " + peerClusterZnode + " with the 
following stats: "
+        manager.removeRecoveredSource(this);
+        LOG.info("Finished recovering queue " + queueId + " with the following 
stats: "
             + getStats());
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/69fac916/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
index fb365bc..1e45496 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
@@ -77,7 +77,7 @@ public class RecoveredReplicationSourceShipper extends 
ReplicationSourceShipper
         if (entryBatch.getWalEntries().isEmpty()
             && entryBatch.getLastSeqIds().isEmpty()) {
           LOG.debug("Finished recovering queue for group " + walGroupId + " of 
peer "
-              + source.getPeerClusterZnode());
+              + source.getQueueId());
           source.getSourceMetrics().incrCompletedRecoveryQueue();
           setWorkerState(WorkerState.FINISHED);
           continue;
@@ -114,7 +114,7 @@ public class RecoveredReplicationSourceShipper extends 
ReplicationSourceShipper
   // normally has a position (unless the RS failed between 2 logs)
   private long getRecoveredQueueStartPos() {
     long startPosition = 0;
-    String peerClusterZnode = source.getPeerClusterZnode();
+    String peerClusterZnode = source.getQueueId();
     try {
       startPosition = 
this.replicationQueues.getWALPosition(source.getServerWALsBelongTo(),
         peerClusterZnode, this.queue.peek().getName());
@@ -130,8 +130,8 @@ public class RecoveredReplicationSourceShipper extends 
ReplicationSourceShipper
 
   @Override
   protected void updateLogPosition(long lastReadPosition) {
-    source.getSourceManager().logPositionAndCleanOldLogs(currentPath, 
source.getPeerClusterZnode(),
-      lastReadPosition, true, false);
+    source.getSourceManager().logPositionAndCleanOldLogs(currentPath, 
source.getQueueId(),
+      lastReadPosition, true);
     lastLoggedPosition = lastReadPosition;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/69fac916/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index dca2439..d1a3266 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
 import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
 import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
-import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
@@ -59,10 +58,10 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
-
-import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 /**
  * Gateway to Replication.  Used by {@link 
org.apache.hadoop.hbase.regionserver.HRegionServer}.
  */
@@ -218,11 +217,7 @@ public class Replication implements
    * @throws IOException
    */
   public void startReplicationService() throws IOException {
-    try {
-      this.replicationManager.init();
-    } catch (ReplicationException e) {
-      throw new IOException(e);
-    }
+    this.replicationManager.init();
     this.replicationSink = new ReplicationSink(this.conf, this.server);
     this.scheduleThreadPool.scheduleAtFixedRate(
       new ReplicationStatisticsThread(this.replicationSink, 
this.replicationManager),
@@ -280,9 +275,9 @@ public class Replication implements
       throws IOException {
     try {
       this.replicationManager.addHFileRefs(tableName, family, pairs);
-    } catch (ReplicationException e) {
+    } catch (IOException e) {
       LOG.error("Failed to add hfile references in the replication queue.", e);
-      throw new IOException(e);
+      throw e;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/69fac916/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 0b44ba4..6b622ee 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -106,7 +106,7 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
   // total number of edits we replicated
   private AtomicLong totalReplicatedEdits = new AtomicLong(0);
   // The znode we currently play with
-  protected String peerClusterZnode;
+  protected String queueId;
   // Maximum number of retries before taking bold actions
   private int maxRetriesMultiplier;
   // Indicates if this particular source is running
@@ -142,14 +142,14 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
    * @param fs file system to use
    * @param manager replication manager to ping to
    * @param server the server for this region server
-   * @param peerClusterZnode the name of our znode
+   * @param queueId the id of our replication queue
    * @param clusterId unique UUID for the cluster
    * @param metrics metrics for replication source
    */
   @Override
   public void init(Configuration conf, FileSystem fs, ReplicationSourceManager 
manager,
       ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, 
Server server,
-      String peerClusterZnode, UUID clusterId, WALFileLengthProvider 
walFileLengthProvider,
+      String queueId, UUID clusterId, WALFileLengthProvider 
walFileLengthProvider,
       MetricsSource metrics) throws IOException {
     this.server = server;
     this.conf = HBaseConfiguration.create(conf);
@@ -168,8 +168,8 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
     this.metrics = metrics;
     this.clusterId = clusterId;
 
-    this.peerClusterZnode = peerClusterZnode;
-    this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
+    this.queueId = queueId;
+    this.replicationQueueInfo = new ReplicationQueueInfo(queueId);
     // ReplicationQueueInfo parses the peerId out of the znode for us
     this.peerId = this.replicationQueueInfo.getPeerId();
     this.logQueueWarnThreshold = 
this.conf.getInt("replication.source.log.queue.warn", 2);
@@ -179,7 +179,7 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
     this.throttler = new ReplicationThrottler((double) currentBandwidth / 
10.0);
     this.totalBufferUsed = manager.getTotalBufferUsed();
     this.walFileLengthProvider = walFileLengthProvider;
-    LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " 
+ peerId
+    LOG.info("queueId=" + queueId + ", ReplicationSource : " + peerId
         + ", currentBandwidth=" + this.currentBandwidth);
   }
 
@@ -217,12 +217,6 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
   @Override
   public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, 
Path>> pairs)
       throws ReplicationException {
-    String peerId = peerClusterZnode;
-    if (peerId.contains("-")) {
-      // peerClusterZnode will be in the form peerId + "-" + rsZNode.
-      // A peerId will not have "-" in its name, see HBASE-11394
-      peerId = peerClusterZnode.split("-")[0];
-    }
     Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
     if (tableCFMap != null) {
       List<String> tableCfs = tableCFMap.get(tableName);
@@ -311,7 +305,7 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
       this.terminate("ClusterId " + clusterId + " is replicating to itself: 
peerClusterId "
           + peerClusterId + " which is not allowed by ReplicationEndpoint:"
           + replicationEndpoint.getClass().getName(), null, false);
-      this.manager.closeQueue(this);
+      this.manager.removeSource(this);
       return;
     }
     LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
@@ -356,7 +350,7 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
     ReplicationSourceWALReader walReader =
         new ReplicationSourceWALReader(fs, conf, queue, startPosition, 
walEntryFilter, this);
     return (ReplicationSourceWALReader) 
Threads.setDaemonThreadRunning(walReader,
-      threadName + ".replicationSource.wal-reader." + walGroupId + "," + 
peerClusterZnode,
+      threadName + ".replicationSource.wal-reader." + walGroupId + "," + 
queueId,
       getUncaughtExceptionHandler());
   }
 
@@ -450,7 +444,7 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
         LOG.error("Unexpected exception in ReplicationSource", e);
       }
     };
-    Threads.setDaemonThreadRunning(this, n + ".replicationSource," + 
this.peerClusterZnode,
+    Threads.setDaemonThreadRunning(this, n + ".replicationSource," + 
this.queueId,
       handler);
   }
 
@@ -466,9 +460,9 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
 
   public void terminate(String reason, Exception cause, boolean join) {
     if (cause == null) {
-      LOG.info("Closing source " + this.peerClusterZnode + " because: " + 
reason);
+      LOG.info("Closing source " + this.queueId + " because: " + reason);
     } else {
-      LOG.error("Closing source " + this.peerClusterZnode + " because an error 
occurred: " + reason,
+      LOG.error("Closing source " + this.queueId + " because an error 
occurred: " + reason,
         cause);
     }
     this.sourceRunning = false;
@@ -491,7 +485,7 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
           this.replicationEndpoint.awaitTerminated(sleepForRetries * 
maxRetriesMultiplier, TimeUnit.MILLISECONDS);
         } catch (TimeoutException te) {
           LOG.warn("Got exception while waiting for endpoint to shutdown for 
replication source :"
-              + this.peerClusterZnode,
+              + this.queueId,
             te);
         }
       }
@@ -499,8 +493,8 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
   }
 
   @Override
-  public String getPeerClusterZnode() {
-    return this.peerClusterZnode;
+  public String getQueueId() {
+    return this.queueId;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/69fac916/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
index 865a202..93e8331 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
@@ -32,8 +32,8 @@ public class ReplicationSourceFactory {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationSourceFactory.class);
 
-  static ReplicationSourceInterface create(Configuration conf, String peerId) {
-    ReplicationQueueInfo replicationQueueInfo = new 
ReplicationQueueInfo(peerId);
+  static ReplicationSourceInterface create(Configuration conf, String queueId) 
{
+    ReplicationQueueInfo replicationQueueInfo = new 
ReplicationQueueInfo(queueId);
     boolean isQueueRecovered = replicationQueueInfo.isQueueRecovered();
     ReplicationSourceInterface src;
     try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/69fac916/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 4f10c73..d7cf9a3 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -51,7 +51,7 @@ public interface ReplicationSourceInterface {
    */
   void init(Configuration conf, FileSystem fs, ReplicationSourceManager 
manager,
       ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, 
Server server,
-      String peerClusterZnode, UUID clusterId, WALFileLengthProvider 
walFileLengthProvider,
+      String queueId, UUID clusterId, WALFileLengthProvider 
walFileLengthProvider,
       MetricsSource metrics) throws IOException;
 
   /**
@@ -96,11 +96,11 @@ public interface ReplicationSourceInterface {
   Path getCurrentPath();
 
   /**
-   * Get the id that the source is replicating to
+   * Get the queue id that the source is replicating to
    *
-   * @return peer cluster id
+   * @return queue id
    */
-  String getPeerClusterZnode();
+  String getQueueId();
 
   /**
    * Get the id that the source is replicating to.

Reply via email to