http://git-wip-us.apache.org/repos/asf/hbase/blob/3fc2f857/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 0214241..12a806b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -166,7 +166,6 @@ public class ReplicationSourceManager implements ReplicationListener { this.clusterId = clusterId; this.walFileLengthProvider = walFileLengthProvider; this.replicationTracker.registerListener(this); - this.replicationPeers.getAllPeerIds(); // It's preferable to failover 1 RS at a time, but with good zk servers // more could be processed at the same time. int nbWorkers = conf.getInt("replication.executor.workers", 1); @@ -270,8 +269,8 @@ public class ReplicationSourceManager implements ReplicationListener { } List<ServerName> otherRegionServers = replicationTracker.getListOfRegionServers().stream() .map(ServerName::valueOf).collect(Collectors.toList()); - LOG.info( - "Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers); + LOG.info("Current list of replicators: " + currentReplicators + " other RSs: " + + otherRegionServers); // Look if there's anything to process after a restart for (ServerName rs : currentReplicators) { @@ -288,7 +287,7 @@ public class ReplicationSourceManager implements ReplicationListener { * The returned future is for adoptAbandonedQueues task. */ Future<?> init() throws IOException, ReplicationException { - for (String id : this.replicationPeers.getConnectedPeerIds()) { + for (String id : this.replicationPeers.getAllPeerIds()) { addSource(id); if (replicationForBulkLoadDataEnabled) { // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case @@ -307,8 +306,8 @@ public class ReplicationSourceManager implements ReplicationListener { */ @VisibleForTesting ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException { - ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id); - ReplicationPeer peer = replicationPeers.getConnectedPeer(id); + ReplicationPeerConfig peerConfig = replicationPeers.getPeerConfig(id); + ReplicationPeer peer = replicationPeers.getPeer(id); ReplicationSourceInterface src = getReplicationSource(id, peerConfig, peer); synchronized (this.walsById) { this.sources.add(src); @@ -354,7 +353,7 @@ public class ReplicationSourceManager implements ReplicationListener { public void deleteSource(String peerId, boolean closeConnection) { abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), peerId)); if (closeConnection) { - this.replicationPeers.peerDisconnected(peerId); + this.replicationPeers.removePeer(peerId); } } @@ -447,12 +446,12 @@ public class ReplicationSourceManager implements ReplicationListener { // update replication queues on ZK // synchronize on replicationPeers to avoid adding source for the to-be-removed peer synchronized (replicationPeers) { - for (String id : replicationPeers.getConnectedPeerIds()) { + for (String id : replicationPeers.getAllPeerIds()) { try { this.queueStorage.addWAL(server.getServerName(), id, logName); } catch (ReplicationException e) { - throw new IOException("Cannot add log to replication queue" + - " when creating a new source, queueId=" + id + ", filename=" + logName, e); + throw new IOException("Cannot add log to replication queue" + + " when creating a new source, queueId=" + id + ", filename=" + logName, e); } } } @@ -597,7 +596,7 @@ public class ReplicationSourceManager implements ReplicationListener { public void addPeer(String id) throws ReplicationException, IOException { LOG.info("Trying to add peer, peerId: " + id); - boolean added = this.replicationPeers.peerConnected(id); + boolean added = this.replicationPeers.addPeer(id); if (added) { LOG.info("Peer " + id + " connected success, trying to start the replication source thread."); addSource(id); @@ -733,19 +732,25 @@ public class ReplicationSourceManager implements ReplicationListener { // there is not an actual peer defined corresponding to peerId for the failover. ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); String actualPeerId = replicationQueueInfo.getPeerId(); - ReplicationPeer peer = replicationPeers.getConnectedPeer(actualPeerId); + + ReplicationPeer peer = replicationPeers.getPeer(actualPeerId); + if (peer == null) { + LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS + + ", peer is null"); + abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId)); + continue; + } + ReplicationPeerConfig peerConfig = null; try { - peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId); - } catch (ReplicationException ex) { - LOG.warn("Received exception while getting replication peer config, skipping replay" - + ex); - } - if (peer == null || peerConfig == null) { - LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS); + peerConfig = replicationPeers.getPeerConfig(actualPeerId); + } catch (Exception e) { + LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS + + ", failed to read peer config", e); abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId)); continue; } + // track sources in walsByIdRecoveredQueues Map<String, SortedSet<String>> walsByGroup = new HashMap<>(); walsByIdRecoveredQueues.put(peerId, walsByGroup); @@ -764,7 +769,7 @@ public class ReplicationSourceManager implements ReplicationListener { // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer // see removePeer synchronized (oldsources) { - if (!replicationPeers.getConnectedPeerIds().contains(src.getPeerId())) { + if (!replicationPeers.getAllPeerIds().contains(src.getPeerId())) { src.terminate("Recovered queue doesn't belong to any current peer"); closeRecoveredQueue(src); continue;
http://git-wip-us.apache.org/repos/asf/hbase/blob/3fc2f857/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java index e1eb822..08dd428 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java @@ -94,7 +94,7 @@ public class TestReplicationHFileCleaner { server = new DummyServer(); conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); HMaster.decorateMasterConfiguration(conf); - rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf, server); + rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf); rp.init(); rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); fs = FileSystem.get(conf); @@ -108,7 +108,8 @@ public class TestReplicationHFileCleaner { @Before public void setup() throws ReplicationException, IOException { root = TEST_UTIL.getDataTestDirOnTestFS(); - rp.registerPeer(peerId, new ReplicationPeerConfig().setClusterKey(TEST_UTIL.getClusterKey())); + rp.getPeerStorage().addPeer(peerId, + ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL.getClusterKey()).build(), true); rq.addPeerToHFileRefs(peerId); } @@ -119,7 +120,7 @@ public class TestReplicationHFileCleaner { } catch (IOException e) { LOG.warn("Failed to delete files recursively from path " + root); } - rp.unregisterPeer(peerId); + rp.getPeerStorage().removePeer(peerId); } @Test http://git-wip-us.apache.org/repos/asf/hbase/blob/3fc2f857/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java index 7b2e73f..53b8ba3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java @@ -19,9 +19,7 @@ package org.apache.hadoop.hbase.replication; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/hbase/blob/3fc2f857/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java index de7e768..9f0c670 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java @@ -21,8 +21,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -43,6 +41,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -77,10 +76,6 @@ public class TestReplicationTrackerZKImpl { private ReplicationTracker rt; private AtomicInteger rsRemovedCount; private String rsRemovedData; - private AtomicInteger plChangedCount; - private List<String> plChangedData; - private AtomicInteger peerRemovedCount; - private String peerRemovedData; @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -97,7 +92,7 @@ public class TestReplicationTrackerZKImpl { String fakeRs1 = ZNodePaths.joinZNode(zkw.znodePaths.rsZNode, "hostname1.example.org:1234"); try { ZKClusterId.setClusterId(zkw, new ClusterId()); - rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw); + rp = ReplicationFactory.getReplicationPeers(zkw, conf); rp.init(); rt = ReplicationFactory.getReplicationTracker(zkw, rp, conf, zkw, new DummyServer(fakeRs1)); } catch (Exception e) { @@ -105,10 +100,6 @@ public class TestReplicationTrackerZKImpl { } rsRemovedCount = new AtomicInteger(0); rsRemovedData = ""; - plChangedCount = new AtomicInteger(0); - plChangedData = new ArrayList<>(); - peerRemovedCount = new AtomicInteger(0); - peerRemovedData = ""; } @AfterClass @@ -161,25 +152,22 @@ public class TestReplicationTrackerZKImpl { @Test(timeout = 30000) public void testPeerNameControl() throws Exception { int exists = 0; - int hyphen = 0; - rp.registerPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey())); + rp.getPeerStorage().addPeer("6", + ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true); - try{ - rp.registerPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey())); - }catch(IllegalArgumentException e){ - exists++; + try { + rp.getPeerStorage().addPeer("6", + ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true); + } catch (ReplicationException e) { + if (e.getCause() instanceof KeeperException.NodeExistsException) { + exists++; + } } - try{ - rp.registerPeer("6-ec2", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey())); - }catch(IllegalArgumentException e){ - hyphen++; - } assertEquals(1, exists); - assertEquals(1, hyphen); // clean up - rp.unregisterPeer("6"); + rp.getPeerStorage().removePeer("6"); } private class DummyReplicationListener implements ReplicationListener { http://git-wip-us.apache.org/repos/asf/hbase/blob/3fc2f857/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 1d3b6ea..77b2fb2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -389,7 +389,7 @@ public abstract class TestReplicationSourceManager { } Server s1 = new DummyServer("dummyserver1.example.org"); ReplicationPeers rp1 = - ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1); + ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration()); rp1.init(); NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName()); @@ -585,7 +585,7 @@ public abstract class TestReplicationSourceManager { private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig, final boolean waitForSource) throws Exception { final ReplicationPeers rp = manager.getReplicationPeers(); - rp.registerPeer(peerId, peerConfig); + rp.getPeerStorage().addPeer(peerId, peerConfig, true); try { manager.addPeer(peerId); } catch (Exception e) { @@ -612,7 +612,7 @@ public abstract class TestReplicationSourceManager { } return true; } else { - return (rp.getConnectedPeer(peerId) != null); + return (rp.getPeer(peerId) != null); } }); } @@ -624,8 +624,8 @@ public abstract class TestReplicationSourceManager { */ private void removePeerAndWait(final String peerId) throws Exception { final ReplicationPeers rp = manager.getReplicationPeers(); - if (rp.getAllPeerIds().contains(peerId)) { - rp.unregisterPeer(peerId); + if (rp.getPeerStorage().listPeerIds().contains(peerId)) { + rp.getPeerStorage().removePeer(peerId); try { manager.removePeer(peerId); } catch (Exception e) { @@ -635,10 +635,9 @@ public abstract class TestReplicationSourceManager { Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { @Override public boolean evaluate() throws Exception { - List<String> peers = rp.getAllPeerIds(); - return (!manager.getAllQueues().contains(peerId)) && - (rp.getConnectedPeer(peerId) == null) && (!peers.contains(peerId)) && - manager.getSource(peerId) == null; + Collection<String> peers = rp.getPeerStorage().listPeerIds(); + return (!manager.getAllQueues().contains(peerId)) && (rp.getPeer(peerId) == null) + && (!peers.contains(peerId)) && manager.getSource(peerId) == null; } }); } http://git-wip-us.apache.org/repos/asf/hbase/blob/3fc2f857/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java index fc31c37..b755c32 100644 --- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java @@ -182,8 +182,7 @@ public class HBaseZKTestingUtility extends HBaseCommonTestingUtility { /** * Gets a ZKWatcher. */ - public static ZKWatcher getZooKeeperWatcher(HBaseZKTestingUtility testUtil) - throws ZooKeeperConnectionException, IOException { + public static ZKWatcher getZooKeeperWatcher(HBaseZKTestingUtility testUtil) throws IOException { ZKWatcher zkw = new ZKWatcher(testUtil.getConfiguration(), "unittest", new Abortable() { boolean aborted = false;