Repository: hbase Updated Branches: refs/heads/master 03fe257a6 -> 744248c13
HBASE-16096 Properly remove the replication queue and peer znodes after calling ReplicationSourceManager.removePeer(). Signed-off-by: Elliott Clark <ecl...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/744248c1 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/744248c1 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/744248c1 Branch: refs/heads/master Commit: 744248c131d344e5ddab3cfe032aad919dc0de0f Parents: 03fe257 Author: Joseph Hwang <j...@fb.com> Authored: Thu Jun 30 15:18:33 2016 -0700 Committer: Elliott Clark <ecl...@apache.org> Committed: Fri Jul 22 18:11:41 2016 -0700 ---------------------------------------------------------------------- .../regionserver/ReplicationSourceManager.java | 15 +++++--- .../TestReplicationSourceManager.java | 40 ++++++++++++++++++++ 2 files changed, 50 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/744248c1/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index e2a232f..586aace 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -561,9 +561,10 @@ public class ReplicationSourceManager implements ReplicationListener { srcToRemove.add(src); } } - if (srcToRemove.size() == 0) { - LOG.error("The queue we wanted to close is missing " + id); - return; + if (srcToRemove.isEmpty()) { + LOG.error("The peer we wanted to remove is missing a ReplicationSourceInterface. " + + "This could mean that ReplicationSourceInterface initialization failed for this peer " + + "and that replication on this peer may not be caught up. peerId=" + id); } for (ReplicationSourceInterface toRemove : srcToRemove) { toRemove.terminate(terminateMessage); @@ -739,8 +740,6 @@ public class ReplicationSourceManager implements ReplicationListener { } } - - /** * Get the directory where wals are archived * @return the directory where wals are archived @@ -766,6 +765,12 @@ public class ReplicationSourceManager implements ReplicationListener { } /** + * Get the ReplicationPeers used by this ReplicationSourceManager + * @return the ReplicationPeers used by this ReplicationSourceManager + */ + public ReplicationPeers getReplicationPeers() {return this.replicationPeers;} + + /** * Get a string representation of all the sources' metrics */ public String getStats() { http://git-wip-us.apache.org/repos/asf/hbase/blob/744248c1/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 4442bbb..7696e95 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; @@ -424,6 +425,45 @@ public abstract class TestReplicationSourceManager { scopes.containsKey(f2)); } + /** + * Test whether calling removePeer() on a ReplicationSourceManager that failed on initializing the + * corresponding ReplicationSourceInterface correctly cleans up the corresponding + * replication queue and ReplicationPeer. + * See HBASE-16096. + * @throws Exception + */ + @Test + public void testPeerRemovalCleanup() throws Exception{ + String replicationSourceImplName = conf.get("replication.replicationsource.implementation"); + try { + DummyServer server = new DummyServer(); + ReplicationQueues rq = + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments( + server.getConfiguration(), server, server.getZooKeeper())); + rq.init(server.getServerName().toString()); + // Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface + // initialization to throw an exception. + conf.set("replication.replicationsource.implementation", "fakeReplicationSourceImpl"); + ReplicationPeers rp = manager.getReplicationPeers(); + // Set up the znode and ReplicationPeer for the fake peer + rp.registerPeer("FakePeer", new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase")); + rp.peerConnected("FakePeer"); + // Have ReplicationSourceManager add the fake peer. It should fail to initialize a + // ReplicationSourceInterface. + List<String> fakePeers = new ArrayList<>(); + fakePeers.add("FakePeer"); + manager.peerListChanged(fakePeers); + // Create a replication queue for the fake peer + rq.addLog("FakePeer", "FakeFile"); + // Removing the peer should remove both the replication queue and the ReplicationPeer + manager.removePeer("FakePeer"); + assertFalse(rq.getAllQueues().contains("FakePeer")); + assertNull(rp.getConnectedPeer("FakePeer")); + } finally { + conf.set("replication.replicationsource.implementation", replicationSourceImplName); + } + } + private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> scope) { // 1. Create store files for the families Map<byte[], List<Path>> storeFiles = new HashMap<>(1);