HBASE-16681: Flaky TestReplicationSourceManagerZkImpl
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2c7211ec Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2c7211ec Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2c7211ec Branch: refs/heads/hbase-12439 Commit: 2c7211ec4bd6d83e498ddc82e60d70f411140ee0 Parents: 97c1333 Author: Ashu Pachauri <ashu210...@gmail.com> Authored: Fri Sep 23 16:04:08 2016 -0700 Committer: Apekshit Sharma <a...@apache.org> Committed: Thu Oct 6 16:26:38 2016 -0700 ---------------------------------------------------------------------- .../TestReplicationSourceManager.java | 59 +++++++++++++++----- 1 file changed, 46 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/2c7211ec/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 7174d5f..c074048 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -56,7 +57,9 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -64,11 +67,13 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescr import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; +import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -437,28 +442,45 @@ public abstract class TestReplicationSourceManager { String replicationSourceImplName = conf.get("replication.replicationsource.implementation"); try { DummyServer server = new DummyServer(); - ReplicationQueues rq = + final ReplicationQueues rq = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments( server.getConfiguration(), server, server.getZooKeeper())); rq.init(server.getServerName().toString()); // Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface // initialization to throw an exception. - conf.set("replication.replicationsource.implementation", "fakeReplicationSourceImpl"); - ReplicationPeers rp = manager.getReplicationPeers(); + conf.set("replication.replicationsource.implementation", + FailInitializeDummyReplicationSource.class.getName()); + final ReplicationPeers rp = manager.getReplicationPeers(); // Set up the znode and ReplicationPeer for the fake peer rp.registerPeer("FakePeer", new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase")); - rp.peerConnected("FakePeer"); - // Have ReplicationSourceManager add the fake peer. It should fail to initialize a - // ReplicationSourceInterface. - List<String> fakePeers = new ArrayList<>(); - fakePeers.add("FakePeer"); - manager.peerListChanged(fakePeers); + // Wait for the peer to get created and connected + Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { + @Override + public boolean evaluate() throws Exception { + return (rp.getConnectedPeer("FakePeer") != null); + } + }); + + // Make sure that the replication source was not initialized + List<ReplicationSourceInterface> sources = manager.getSources(); + for (ReplicationSourceInterface source : sources) { + assertNotEquals("FakePeer", source.getPeerClusterId()); + } + // Create a replication queue for the fake peer rq.addLog("FakePeer", "FakeFile"); - // Removing the peer should remove both the replication queue and the ReplicationPeer - manager.removePeer("FakePeer"); - assertFalse(rq.getAllQueues().contains("FakePeer")); - assertNull(rp.getConnectedPeer("FakePeer")); + // Unregister peer, this should remove the peer and clear all queues associated with it + // Need to wait for the ReplicationTracker to pick up the changes and notify listeners. + rp.unregisterPeer("FakePeer"); + Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { + @Override + public boolean evaluate() throws Exception { + List<String> peers = rp.getAllPeerIds(); + return (!rq.getAllQueues().contains("FakePeer")) + && (rp.getConnectedPeer("FakePeer") == null) + && (!peers.contains("FakePeer")); + } + }); } finally { conf.set("replication.replicationsource.implementation", replicationSourceImplName); } @@ -553,6 +575,17 @@ public abstract class TestReplicationSourceManager { } } + static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy { + + @Override + public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, + ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId, + UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics) + throws IOException { + throw new IOException("Failing deliberately"); + } + } + static class DummyServer implements Server { String hostname;