http://git-wip-us.apache.org/repos/asf/hbase/blob/3fc2f857/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 0214241..12a806b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -166,7 +166,6 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     this.clusterId = clusterId;
     this.walFileLengthProvider = walFileLengthProvider;
     this.replicationTracker.registerListener(this);
-    this.replicationPeers.getAllPeerIds();
     // It's preferable to failover 1 RS at a time, but with good zk servers
     // more could be processed at the same time.
     int nbWorkers = conf.getInt("replication.executor.workers", 1);
@@ -270,8 +269,8 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     }
     List<ServerName> otherRegionServers = 
replicationTracker.getListOfRegionServers().stream()
         .map(ServerName::valueOf).collect(Collectors.toList());
-    LOG.info(
-      "Current list of replicators: " + currentReplicators + " other RSs: " + 
otherRegionServers);
+    LOG.info("Current list of replicators: " + currentReplicators + " other 
RSs: "
+        + otherRegionServers);
 
     // Look if there's anything to process after a restart
     for (ServerName rs : currentReplicators) {
@@ -288,7 +287,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    * The returned future is for adoptAbandonedQueues task.
    */
   Future<?> init() throws IOException, ReplicationException {
-    for (String id : this.replicationPeers.getConnectedPeerIds()) {
+    for (String id : this.replicationPeers.getAllPeerIds()) {
       addSource(id);
       if (replicationForBulkLoadDataEnabled) {
         // Check if peer exists in hfile-refs queue, if not add it. This can 
happen in the case
@@ -307,8 +306,8 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    */
   @VisibleForTesting
   ReplicationSourceInterface addSource(String id) throws IOException, 
ReplicationException {
-    ReplicationPeerConfig peerConfig = 
replicationPeers.getReplicationPeerConfig(id);
-    ReplicationPeer peer = replicationPeers.getConnectedPeer(id);
+    ReplicationPeerConfig peerConfig = replicationPeers.getPeerConfig(id);
+    ReplicationPeer peer = replicationPeers.getPeer(id);
     ReplicationSourceInterface src = getReplicationSource(id, peerConfig, 
peer);
     synchronized (this.walsById) {
       this.sources.add(src);
@@ -354,7 +353,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   public void deleteSource(String peerId, boolean closeConnection) {
     abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), 
peerId));
     if (closeConnection) {
-      this.replicationPeers.peerDisconnected(peerId);
+      this.replicationPeers.removePeer(peerId);
     }
   }
 
@@ -447,12 +446,12 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     // update replication queues on ZK
     // synchronize on replicationPeers to avoid adding source for the 
to-be-removed peer
     synchronized (replicationPeers) {
-      for (String id : replicationPeers.getConnectedPeerIds()) {
+      for (String id : replicationPeers.getAllPeerIds()) {
         try {
           this.queueStorage.addWAL(server.getServerName(), id, logName);
         } catch (ReplicationException e) {
-          throw new IOException("Cannot add log to replication queue" +
-            " when creating a new source, queueId=" + id + ", filename=" + 
logName, e);
+          throw new IOException("Cannot add log to replication queue"
+              + " when creating a new source, queueId=" + id + ", filename=" + 
logName, e);
         }
       }
     }
@@ -597,7 +596,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
 
   public void addPeer(String id) throws ReplicationException, IOException {
     LOG.info("Trying to add peer, peerId: " + id);
-    boolean added = this.replicationPeers.peerConnected(id);
+    boolean added = this.replicationPeers.addPeer(id);
     if (added) {
       LOG.info("Peer " + id + " connected success, trying to start the 
replication source thread.");
       addSource(id);
@@ -733,19 +732,25 @@ public class ReplicationSourceManager implements 
ReplicationListener {
           // there is not an actual peer defined corresponding to peerId for 
the failover.
           ReplicationQueueInfo replicationQueueInfo = new 
ReplicationQueueInfo(peerId);
           String actualPeerId = replicationQueueInfo.getPeerId();
-          ReplicationPeer peer = 
replicationPeers.getConnectedPeer(actualPeerId);
+
+          ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
+          if (peer == null) {
+            LOG.warn("Skipping failover for peer:" + actualPeerId + " of node 
" + deadRS
+                + ", peer is null");
+            abortWhenFail(() -> 
queueStorage.removeQueue(server.getServerName(), peerId));
+            continue;
+          }
+
           ReplicationPeerConfig peerConfig = null;
           try {
-            peerConfig = 
replicationPeers.getReplicationPeerConfig(actualPeerId);
-          } catch (ReplicationException ex) {
-            LOG.warn("Received exception while getting replication peer 
config, skipping replay"
-                + ex);
-          }
-          if (peer == null || peerConfig == null) {
-            LOG.warn("Skipping failover for peer:" + actualPeerId + " of node 
" + deadRS);
+            peerConfig = replicationPeers.getPeerConfig(actualPeerId);
+          } catch (Exception e) {
+            LOG.warn("Skipping failover for peer:" + actualPeerId + " of node 
" + deadRS
+                + ", failed to read peer config", e);
             abortWhenFail(() -> 
queueStorage.removeQueue(server.getServerName(), peerId));
             continue;
           }
+
           // track sources in walsByIdRecoveredQueues
           Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
           walsByIdRecoveredQueues.put(peerId, walsByGroup);
@@ -764,7 +769,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
           // synchronized on oldsources to avoid adding recovered source for 
the to-be-removed peer
           // see removePeer
           synchronized (oldsources) {
-            if 
(!replicationPeers.getConnectedPeerIds().contains(src.getPeerId())) {
+            if (!replicationPeers.getAllPeerIds().contains(src.getPeerId())) {
               src.terminate("Recovered queue doesn't belong to any current 
peer");
               closeRecoveredQueue(src);
               continue;

http://git-wip-us.apache.org/repos/asf/hbase/blob/3fc2f857/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
index e1eb822..08dd428 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
@@ -94,7 +94,7 @@ public class TestReplicationHFileCleaner {
     server = new DummyServer();
     conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
     HMaster.decorateMasterConfiguration(conf);
-    rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf, 
server);
+    rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf);
     rp.init();
     rq = 
ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), 
conf);
     fs = FileSystem.get(conf);
@@ -108,7 +108,8 @@ public class TestReplicationHFileCleaner {
   @Before
   public void setup() throws ReplicationException, IOException {
     root = TEST_UTIL.getDataTestDirOnTestFS();
-    rp.registerPeer(peerId, new 
ReplicationPeerConfig().setClusterKey(TEST_UTIL.getClusterKey()));
+    rp.getPeerStorage().addPeer(peerId,
+      
ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL.getClusterKey()).build(),
 true);
     rq.addPeerToHFileRefs(peerId);
   }
 
@@ -119,7 +120,7 @@ public class TestReplicationHFileCleaner {
     } catch (IOException e) {
       LOG.warn("Failed to delete files recursively from path " + root);
     }
-    rp.unregisterPeer(peerId);
+    rp.getPeerStorage().removePeer(peerId);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hbase/blob/3fc2f857/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
index 7b2e73f..53b8ba3 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
@@ -19,9 +19,7 @@ package org.apache.hadoop.hbase.replication;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/hbase/blob/3fc2f857/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
index de7e768..9f0c670 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
@@ -21,8 +21,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -43,6 +41,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
+import org.apache.zookeeper.KeeperException;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -77,10 +76,6 @@ public class TestReplicationTrackerZKImpl {
   private ReplicationTracker rt;
   private AtomicInteger rsRemovedCount;
   private String rsRemovedData;
-  private AtomicInteger plChangedCount;
-  private List<String> plChangedData;
-  private AtomicInteger peerRemovedCount;
-  private String peerRemovedData;
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
@@ -97,7 +92,7 @@ public class TestReplicationTrackerZKImpl {
     String fakeRs1 = ZNodePaths.joinZNode(zkw.znodePaths.rsZNode, 
"hostname1.example.org:1234");
     try {
       ZKClusterId.setClusterId(zkw, new ClusterId());
-      rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
+      rp = ReplicationFactory.getReplicationPeers(zkw, conf);
       rp.init();
       rt = ReplicationFactory.getReplicationTracker(zkw, rp, conf, zkw, new 
DummyServer(fakeRs1));
     } catch (Exception e) {
@@ -105,10 +100,6 @@ public class TestReplicationTrackerZKImpl {
     }
     rsRemovedCount = new AtomicInteger(0);
     rsRemovedData = "";
-    plChangedCount = new AtomicInteger(0);
-    plChangedData = new ArrayList<>();
-    peerRemovedCount = new AtomicInteger(0);
-    peerRemovedData = "";
   }
 
   @AfterClass
@@ -161,25 +152,22 @@ public class TestReplicationTrackerZKImpl {
   @Test(timeout = 30000)
   public void testPeerNameControl() throws Exception {
     int exists = 0;
-    int hyphen = 0;
-    rp.registerPeer("6", new 
ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
+    rp.getPeerStorage().addPeer("6",
+      
ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(),
 true);
 
-    try{
-      rp.registerPeer("6", new 
ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
-    }catch(IllegalArgumentException e){
-      exists++;
+    try {
+      rp.getPeerStorage().addPeer("6",
+        
ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(),
 true);
+    } catch (ReplicationException e) {
+      if (e.getCause() instanceof KeeperException.NodeExistsException) {
+        exists++;
+      }
     }
 
-    try{
-      rp.registerPeer("6-ec2", new 
ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
-    }catch(IllegalArgumentException e){
-      hyphen++;
-    }
     assertEquals(1, exists);
-    assertEquals(1, hyphen);
 
     // clean up
-    rp.unregisterPeer("6");
+    rp.getPeerStorage().removePeer("6");
   }
 
   private class DummyReplicationListener implements ReplicationListener {

http://git-wip-us.apache.org/repos/asf/hbase/blob/3fc2f857/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 1d3b6ea..77b2fb2 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -389,7 +389,7 @@ public abstract class TestReplicationSourceManager {
     }
     Server s1 = new DummyServer("dummyserver1.example.org");
     ReplicationPeers rp1 =
-        ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), 
s1.getConfiguration(), s1);
+        ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), 
s1.getConfiguration());
     rp1.init();
     NodeFailoverWorker w1 =
         manager.new NodeFailoverWorker(server.getServerName());
@@ -585,7 +585,7 @@ public abstract class TestReplicationSourceManager {
   private void addPeerAndWait(final String peerId, final ReplicationPeerConfig 
peerConfig,
       final boolean waitForSource) throws Exception {
     final ReplicationPeers rp = manager.getReplicationPeers();
-    rp.registerPeer(peerId, peerConfig);
+    rp.getPeerStorage().addPeer(peerId, peerConfig, true);
     try {
       manager.addPeer(peerId);
     } catch (Exception e) {
@@ -612,7 +612,7 @@ public abstract class TestReplicationSourceManager {
         }
         return true;
       } else {
-        return (rp.getConnectedPeer(peerId) != null);
+        return (rp.getPeer(peerId) != null);
       }
     });
   }
@@ -624,8 +624,8 @@ public abstract class TestReplicationSourceManager {
    */
   private void removePeerAndWait(final String peerId) throws Exception {
     final ReplicationPeers rp = manager.getReplicationPeers();
-    if (rp.getAllPeerIds().contains(peerId)) {
-      rp.unregisterPeer(peerId);
+    if (rp.getPeerStorage().listPeerIds().contains(peerId)) {
+      rp.getPeerStorage().removePeer(peerId);
       try {
         manager.removePeer(peerId);
       } catch (Exception e) {
@@ -635,10 +635,9 @@ public abstract class TestReplicationSourceManager {
     Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
       @Override
       public boolean evaluate() throws Exception {
-        List<String> peers = rp.getAllPeerIds();
-        return (!manager.getAllQueues().contains(peerId)) &&
-          (rp.getConnectedPeer(peerId) == null) && (!peers.contains(peerId)) &&
-          manager.getSource(peerId) == null;
+        Collection<String> peers = rp.getPeerStorage().listPeerIds();
+        return (!manager.getAllQueues().contains(peerId)) && 
(rp.getPeer(peerId) == null)
+            && (!peers.contains(peerId)) && manager.getSource(peerId) == null;
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3fc2f857/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java
----------------------------------------------------------------------
diff --git 
a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java
 
b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java
index fc31c37..b755c32 100644
--- 
a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java
+++ 
b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java
@@ -182,8 +182,7 @@ public class HBaseZKTestingUtility extends 
HBaseCommonTestingUtility {
   /**
    * Gets a ZKWatcher.
    */
-  public static ZKWatcher getZooKeeperWatcher(HBaseZKTestingUtility testUtil)
-      throws ZooKeeperConnectionException, IOException {
+  public static ZKWatcher getZooKeeperWatcher(HBaseZKTestingUtility testUtil) 
throws IOException {
     ZKWatcher zkw = new ZKWatcher(testUtil.getConfiguration(), "unittest", new 
Abortable() {
       boolean aborted = false;
 

Reply via email to