Repository: hbase Updated Branches: refs/heads/master 641c87ddf -> 31978c31b
http://git-wip-us.apache.org/repos/asf/hbase/blob/31978c31/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 6d75fec..4a36e13 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -70,7 +70,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; -import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage; +import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; @@ -170,9 +170,9 @@ public abstract class TestReplicationSourceManager { + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1")); ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state"); ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state", - ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES); + ReplicationUtils.PEER_STATE_ENABLED_BYTES); ZKUtil.createWithParents(zkw, "/hbase/replication/state"); - ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES); + ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationUtils.PEER_STATE_ENABLED_BYTES); ZKClusterId.setClusterId(zkw, new ClusterId()); FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir()); http://git-wip-us.apache.org/repos/asf/hbase/blob/31978c31/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateBasic.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateBasic.java new file mode 100644 index 0000000..461420e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateBasic.java @@ -0,0 +1,370 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.storage; + +import static org.hamcrest.CoreMatchers.hasItems; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; +import org.apache.hadoop.hbase.replication.ReplicationPeers; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; +import org.apache.zookeeper.KeeperException; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; + +/** + * White box testing for replication state interfaces. Implementations should extend this class, and + * initialize the interfaces properly. + */ +public abstract class TestReplicationStateBasic { + + private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class); + + protected ReplicationQueueStorage rqs; + protected ServerName server1 = ServerName.valueOf("hostname1.example.org", 1234, 12345); + protected ServerName server2 = ServerName.valueOf("hostname2.example.org", 1234, 12345); + protected ServerName server3 = ServerName.valueOf("hostname3.example.org", 1234, 12345); + protected ReplicationPeers rp; + protected static final String ID_ONE = "1"; + protected static final String ID_TWO = "2"; + protected static String KEY_ONE; + protected static String KEY_TWO; + + // For testing when we try to replicate to ourself + protected String OUR_KEY; + + protected static int zkTimeoutCount; + protected static final int ZK_MAX_COUNT = 300; + protected static final int ZK_SLEEP_INTERVAL = 100; // millis + + @Test + public void testReplicationQueueStorage() throws ReplicationException { + // Test methods with empty state + assertEquals(0, rqs.getListOfReplicators().size()); + assertTrue(rqs.getWALsInQueue(server1, "qId1").isEmpty()); + assertTrue(rqs.getAllQueues(server1).isEmpty()); + + /* + * Set up data Two replicators: -- server1: three queues with 0, 1 and 2 log files each -- + * server2: zero queues + */ + rqs.addWAL(server1, "qId1", "trash"); + rqs.removeWAL(server1, "qId1", "trash"); + rqs.addWAL(server1,"qId2", "filename1"); + rqs.addWAL(server1,"qId3", "filename2"); + rqs.addWAL(server1,"qId3", "filename3"); + rqs.addWAL(server2,"trash", "trash"); + rqs.removeQueue(server2,"trash"); + + List<ServerName> reps = rqs.getListOfReplicators(); + assertEquals(2, reps.size()); + assertTrue(server1.getServerName(), reps.contains(server1)); + assertTrue(server2.getServerName(), reps.contains(server2)); + + assertTrue(rqs.getWALsInQueue(ServerName.valueOf("bogus", 12345, 12345), "bogus").isEmpty()); + assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty()); + assertEquals(0, rqs.getWALsInQueue(server1, "qId1").size()); + assertEquals(1, rqs.getWALsInQueue(server1, "qId2").size()); + assertEquals("filename1", rqs.getWALsInQueue(server1, "qId2").get(0)); + + assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 12345, -1L)).isEmpty()); + assertEquals(0, rqs.getAllQueues(server2).size()); + List<String> list = rqs.getAllQueues(server1); + assertEquals(3, list.size()); + assertTrue(list.contains("qId2")); + assertTrue(list.contains("qId3")); + } + + private void removeAllQueues(ServerName serverName) throws ReplicationException { + for (String queue: rqs.getAllQueues(serverName)) { + rqs.removeQueue(serverName, queue); + } + } + @Test + public void testReplicationQueues() throws ReplicationException { + // Initialize ReplicationPeer so we can add peers (we don't transfer lone queues) + rp.init(); + + rqs.removeQueue(server1, "bogus"); + rqs.removeWAL(server1, "bogus", "bogus"); + removeAllQueues(server1); + assertEquals(0, rqs.getAllQueues(server1).size()); + assertEquals(0, rqs.getWALPosition(server1, "bogus", "bogus")); + assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty()); + assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 1234, 12345)).isEmpty()); + + populateQueues(); + + assertEquals(3, rqs.getListOfReplicators().size()); + assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size()); + assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size()); + assertEquals(0, rqs.getWALPosition(server3, "qId1", "filename0")); + rqs.setWALPosition(server3, "qId5", "filename4", 354L, null); + assertEquals(354L, rqs.getWALPosition(server3, "qId5", "filename4")); + + assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size()); + assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size()); + assertEquals(0, rqs.getAllQueues(server1).size()); + assertEquals(1, rqs.getAllQueues(server2).size()); + assertEquals(5, rqs.getAllQueues(server3).size()); + + assertEquals(0, rqs.getAllQueues(server1).size()); + rqs.removeReplicatorIfQueueIsEmpty(server1); + assertEquals(2, rqs.getListOfReplicators().size()); + + List<String> queues = rqs.getAllQueues(server3); + assertEquals(5, queues.size()); + for (String queue : queues) { + rqs.claimQueue(server3, queue, server2); + } + rqs.removeReplicatorIfQueueIsEmpty(server3); + assertEquals(1, rqs.getListOfReplicators().size()); + + assertEquals(6, rqs.getAllQueues(server2).size()); + removeAllQueues(server2); + rqs.removeReplicatorIfQueueIsEmpty(server2); + assertEquals(0, rqs.getListOfReplicators().size()); + } + + @Test + public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException { + rp.init(); + + List<Pair<Path, Path>> files1 = new ArrayList<>(3); + files1.add(new Pair<>(null, new Path("file_1"))); + files1.add(new Pair<>(null, new Path("file_2"))); + files1.add(new Pair<>(null, new Path("file_3"))); + assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty()); + assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size()); + rp.getPeerStorage().addPeer(ID_ONE, + ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true); + rqs.addPeerToHFileRefs(ID_ONE); + rqs.addHFileRefs(ID_ONE, files1); + assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size()); + assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size()); + List<String> hfiles2 = new ArrayList<>(files1.size()); + for (Pair<Path, Path> p : files1) { + hfiles2.add(p.getSecond().getName()); + } + String removedString = hfiles2.remove(0); + rqs.removeHFileRefs(ID_ONE, hfiles2); + assertEquals(1, rqs.getReplicableHFiles(ID_ONE).size()); + hfiles2 = new ArrayList<>(1); + hfiles2.add(removedString); + rqs.removeHFileRefs(ID_ONE, hfiles2); + assertEquals(0, rqs.getReplicableHFiles(ID_ONE).size()); + rp.getPeerStorage().removePeer(ID_ONE); + } + + @Test + public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException { + rp.init(); + rp.getPeerStorage().addPeer(ID_ONE, + ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true); + rqs.addPeerToHFileRefs(ID_ONE); + rp.getPeerStorage().addPeer(ID_TWO, + ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true); + rqs.addPeerToHFileRefs(ID_TWO); + + List<Pair<Path, Path>> files1 = new ArrayList<>(3); + files1.add(new Pair<>(null, new Path("file_1"))); + files1.add(new Pair<>(null, new Path("file_2"))); + files1.add(new Pair<>(null, new Path("file_3"))); + rqs.addHFileRefs(ID_ONE, files1); + rqs.addHFileRefs(ID_TWO, files1); + assertEquals(2, rqs.getAllPeersFromHFileRefsQueue().size()); + assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size()); + assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size()); + + rp.getPeerStorage().removePeer(ID_ONE); + rqs.removePeerFromHFileRefs(ID_ONE); + assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size()); + assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty()); + assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size()); + + rp.getPeerStorage().removePeer(ID_TWO); + rqs.removePeerFromHFileRefs(ID_TWO); + assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size()); + assertTrue(rqs.getReplicableHFiles(ID_TWO).isEmpty()); + } + + @Test + public void testReplicationPeers() throws Exception { + rp.init(); + + try { + rp.getPeerStorage().setPeerState("bogus", true); + fail("Should have thrown an ReplicationException when passed a non-exist bogus peerId"); + } catch (ReplicationException e) { + } + try { + rp.getPeerStorage().setPeerState("bogus", false); + fail("Should have thrown an ReplicationException when passed a non-exist bogus peerId"); + } catch (ReplicationException e) { + } + + try { + assertFalse(rp.addPeer("bogus")); + fail("Should have thrown an ReplicationException when creating a bogus peerId " + + "with null peer config"); + } catch (ReplicationException e) { + } + + assertNumberOfPeers(0); + + // Add some peers + rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true); + assertNumberOfPeers(1); + rp.getPeerStorage().addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), true); + assertNumberOfPeers(2); + + assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationUtils + .getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), rp.getConf()))); + rp.getPeerStorage().removePeer(ID_ONE); + rp.removePeer(ID_ONE); + assertNumberOfPeers(1); + + // Add one peer + rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true); + rp.addPeer(ID_ONE); + assertNumberOfPeers(2); + assertTrue(rp.getPeer(ID_ONE).isPeerEnabled()); + rp.getPeerStorage().setPeerState(ID_ONE, false); + // now we do not rely on zk watcher to trigger the state change so we need to trigger it + // manually... + ReplicationPeerImpl peer = rp.getPeer(ID_ONE); + rp.refreshPeerState(peer.getId()); + assertEquals(PeerState.DISABLED, peer.getPeerState()); + assertConnectedPeerStatus(false, ID_ONE); + rp.getPeerStorage().setPeerState(ID_ONE, true); + // now we do not rely on zk watcher to trigger the state change so we need to trigger it + // manually... + rp.refreshPeerState(peer.getId()); + assertEquals(PeerState.ENABLED, peer.getPeerState()); + assertConnectedPeerStatus(true, ID_ONE); + + // Disconnect peer + rp.removePeer(ID_ONE); + assertNumberOfPeers(2); + } + + private String getFileName(String base, int i) { + return String.format(base + "-%04d", i); + } + + @Test + public void testPersistLogPositionAndSeqIdAtomically() throws Exception { + ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000); + assertTrue(rqs.getAllQueues(serverName1).isEmpty()); + String queue1 = "1"; + String region0 = "region0", region1 = "region1"; + for (int i = 0; i < 10; i++) { + rqs.addWAL(serverName1, queue1, getFileName("file1", i)); + } + List<String> queueIds = rqs.getAllQueues(serverName1); + assertEquals(1, queueIds.size()); + assertThat(queueIds, hasItems("1")); + + List<String> wals1 = rqs.getWALsInQueue(serverName1, queue1); + assertEquals(10, wals1.size()); + for (int i = 0; i < 10; i++) { + assertThat(wals1, hasItems(getFileName("file1", i))); + } + + for (int i = 0; i < 10; i++) { + assertEquals(0, rqs.getWALPosition(serverName1, queue1, getFileName("file1", i))); + } + assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region0, queue1)); + assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region1, queue1)); + + for (int i = 0; i < 10; i++) { + rqs.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100, + ImmutableMap.of(region0, i * 100L, region1, (i + 1) * 100L)); + } + + for (int i = 0; i < 10; i++) { + assertEquals((i + 1) * 100, rqs.getWALPosition(serverName1, queue1, getFileName("file1", i))); + } + assertEquals(900L, rqs.getLastSequenceId(region0, queue1)); + assertEquals(1000L, rqs.getLastSequenceId(region1, queue1)); + } + + protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception { + // we can first check if the value was changed in the store, if it wasn't then fail right away + if (status != rp.getPeerStorage().isPeerEnabled(peerId)) { + fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK"); + } + while (true) { + if (status == rp.getPeer(peerId).isPeerEnabled()) { + return; + } + if (zkTimeoutCount < ZK_MAX_COUNT) { + LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status + + ", sleeping and trying again."); + Thread.sleep(ZK_SLEEP_INTERVAL); + } else { + fail("Timed out waiting for ConnectedPeerStatus to be " + status); + } + } + } + + protected void assertNumberOfPeers(int total) throws ReplicationException { + assertEquals(total, rp.getPeerStorage().listPeerIds().size()); + } + + /* + * three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 has 5 queues with 1, 2, + * 3, 4, 5 log files respectively + */ + protected void populateQueues() throws ReplicationException { + rqs.addWAL(server1, "trash", "trash"); + rqs.removeQueue(server1, "trash"); + + rqs.addWAL(server2, "qId1", "trash"); + rqs.removeWAL(server2, "qId1", "trash"); + + for (int i = 1; i < 6; i++) { + for (int j = 0; j < i; j++) { + rqs.addWAL(server3, "qId" + i, "filename" + j); + } + // Add peers for the corresponding queues so they are not orphans + rp.getPeerStorage().addPeer("qId" + i, + ReplicationPeerConfig.newBuilder().setClusterKey("localhost:2818:/bogus" + i).build(), + true); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/31978c31/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateTableImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateTableImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateTableImpl.java new file mode 100644 index 0000000..d073669 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateTableImpl.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.storage; + +import java.io.IOException; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterId; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.replication.TableReplicationPeerStorage; +import org.apache.hadoop.hbase.replication.TableReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.TableReplicationStorageBase; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.zookeeper.ZKClusterId; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.zookeeper.KeeperException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestReplicationStateTableImpl extends TestReplicationStateBasic { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReplicationStateTableImpl.class); + + private static Configuration conf; + private static HBaseTestingUtility utility = new HBaseTestingUtility(); + private static ZKWatcher zkw; + private static Connection connection; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf = utility.getConfiguration(); + conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); + conf.set(HConstants.REPLICATION_CLUSTER_ID, "12345"); + utility.startMiniCluster(); + + // After the HBase Mini cluster startup, we set the storage implementation to table based + // implementation. Otherwise, we cannot setup the HBase Mini Cluster because the master will + // list peers before finish its initialization, and if master cannot finish initialization, the + // meta cannot be online, in other hand, if meta cannot be online, the list peers never success + // when using table based replication. a dead loop happen. + // Our UTs are written for testing storage layer, so no problem here. + conf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL, + TableReplicationPeerStorage.class.getName()); + conf.set(ReplicationStorageFactory.REPLICATION_QUEUE_STORAGE_IMPL, + TableReplicationQueueStorage.class.getName()); + + zkw = utility.getZooKeeperWatcher(); + connection = ConnectionFactory.createConnection(conf); + + KEY_ONE = initPeerClusterState("/hbase1"); + KEY_TWO = initPeerClusterState("/hbase2"); + } + + private static String initPeerClusterState(String baseZKNode) + throws IOException, KeeperException { + // Add a dummy region server and set up the cluster id + Configuration testConf = new Configuration(conf); + testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode); + ZKWatcher zkw1 = new ZKWatcher(testConf, "test1", null); + String fakeRs = ZNodePaths.joinZNode(zkw1.znodePaths.rsZNode, "hostname1.example.org:1234"); + ZKUtil.createWithParents(zkw1, fakeRs); + ZKClusterId.setClusterId(zkw1, new ClusterId()); + return ZKConfig.getZooKeeperClusterKey(testConf); + } + + @Before + public void setUp() throws IOException { + rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf); + rp = ReplicationFactory.getReplicationPeers(zkw, conf); + OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf); + + // Create hbase:replication meta table. + try (Admin admin = connection.getAdmin()) { + TableDescriptor table = + TableReplicationStorageBase.createReplicationTableDescBuilder(conf).build(); + admin.createTable(table); + } + } + + @After + public void tearDown() throws KeeperException, IOException { + // Drop the hbase:replication meta table. + utility.deleteTable(TableReplicationStorageBase.REPLICATION_TABLE); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + if (connection != null) { + IOUtils.closeQuietly(connection); + } + utility.shutdownMiniZKCluster(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/31978c31/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateZKImpl.java new file mode 100644 index 0000000..993f2fb --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateZKImpl.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.storage; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterId; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseZKTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.zookeeper.ZKClusterId; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.zookeeper.KeeperException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestReplicationStateZKImpl extends TestReplicationStateBasic { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReplicationStateZKImpl.class); + + private static Configuration conf; + private static HBaseZKTestingUtility utility; + private static ZKWatcher zkw; + private static String replicationZNode; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + utility = new HBaseZKTestingUtility(); + utility.startMiniZKCluster(); + conf = utility.getConfiguration(); + conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); + zkw = utility.getZooKeeperWatcher(); + String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); + replicationZNode = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, replicationZNodeName); + KEY_ONE = initPeerClusterState("/hbase1"); + KEY_TWO = initPeerClusterState("/hbase2"); + } + + private static String initPeerClusterState(String baseZKNode) + throws IOException, KeeperException { + // Add a dummy region server and set up the cluster id + Configuration testConf = new Configuration(conf); + testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode); + ZKWatcher zkw1 = new ZKWatcher(testConf, "test1", null); + String fakeRs = ZNodePaths.joinZNode(zkw1.znodePaths.rsZNode, "hostname1.example.org:1234"); + ZKUtil.createWithParents(zkw1, fakeRs); + ZKClusterId.setClusterId(zkw1, new ClusterId()); + return ZKConfig.getZooKeeperClusterKey(testConf); + } + + @Before + public void setUp() { + zkTimeoutCount = 0; + rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf); + rp = ReplicationFactory.getReplicationPeers(zkw, conf); + OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf); + } + + @After + public void tearDown() throws KeeperException, IOException { + ZKUtil.deleteNodeRecursively(zkw, replicationZNode); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + utility.shutdownMiniZKCluster(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/31978c31/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationPeerStorage.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationPeerStorage.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationPeerStorage.java new file mode 100644 index 0000000..190eef4 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationPeerStorage.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.storage; + +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toSet; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.stream.Stream; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseZKTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestZKReplicationPeerStorage { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestZKReplicationPeerStorage.class); + + private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility(); + + private static ZKReplicationPeerStorage STORAGE; + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniZKCluster(); + STORAGE = new ZKReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); + } + + @AfterClass + public static void tearDown() throws IOException { + UTIL.shutdownMiniZKCluster(); + } + + private Set<String> randNamespaces(Random rand) { + return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5)) + .collect(toSet()); + } + + private Map<TableName, List<String>> randTableCFs(Random rand) { + int size = rand.nextInt(5); + Map<TableName, List<String>> map = new HashMap<>(); + for (int i = 0; i < size; i++) { + TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong())); + List<String> cfs = Stream.generate(() -> Long.toHexString(rand.nextLong())) + .limit(rand.nextInt(5)).collect(toList()); + map.put(tn, cfs); + } + return map; + } + + private ReplicationPeerConfig getConfig(int seed) { + Random rand = new Random(seed); + return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(rand.nextLong())) + .setReplicationEndpointImpl(Long.toHexString(rand.nextLong())) + .setNamespaces(randNamespaces(rand)).setExcludeNamespaces(randNamespaces(rand)) + .setTableCFsMap(randTableCFs(rand)).setReplicateAllUserTables(rand.nextBoolean()) + .setBandwidth(rand.nextInt(1000)).build(); + } + + private void assertSetEquals(Set<String> expected, Set<String> actual) { + if (expected == null || expected.size() == 0) { + assertTrue(actual == null || actual.size() == 0); + return; + } + assertEquals(expected.size(), actual.size()); + expected.forEach(s -> assertTrue(actual.contains(s))); + } + + private void assertMapEquals(Map<TableName, List<String>> expected, + Map<TableName, List<String>> actual) { + if (expected == null || expected.size() == 0) { + assertTrue(actual == null || actual.size() == 0); + return; + } + assertEquals(expected.size(), actual.size()); + expected.forEach((expectedTn, expectedCFs) -> { + List<String> actualCFs = actual.get(expectedTn); + if (expectedCFs == null || expectedCFs.size() == 0) { + assertTrue(actual.containsKey(expectedTn)); + assertTrue(actualCFs == null || actualCFs.size() == 0); + } else { + assertNotNull(actualCFs); + assertEquals(expectedCFs.size(), actualCFs.size()); + for (Iterator<String> expectedIt = expectedCFs.iterator(), actualIt = actualCFs.iterator(); + expectedIt.hasNext();) { + assertEquals(expectedIt.next(), actualIt.next()); + } + } + }); + } + + private void assertConfigEquals(ReplicationPeerConfig expected, ReplicationPeerConfig actual) { + assertEquals(expected.getClusterKey(), actual.getClusterKey()); + assertEquals(expected.getReplicationEndpointImpl(), actual.getReplicationEndpointImpl()); + assertSetEquals(expected.getNamespaces(), actual.getNamespaces()); + assertSetEquals(expected.getExcludeNamespaces(), actual.getExcludeNamespaces()); + assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap()); + assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap()); + assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables()); + assertEquals(expected.getBandwidth(), actual.getBandwidth()); + } + + @Test + public void test() throws ReplicationException { + int peerCount = 10; + for (int i = 0; i < peerCount; i++) { + STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0); + } + List<String> peerIds = STORAGE.listPeerIds(); + assertEquals(peerCount, peerIds.size()); + for (String peerId : peerIds) { + int seed = Integer.parseInt(peerId); + assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId)); + } + for (int i = 0; i < peerCount; i++) { + STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1)); + } + for (String peerId : peerIds) { + int seed = Integer.parseInt(peerId); + assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId)); + } + for (int i = 0; i < peerCount; i++) { + assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i))); + } + for (int i = 0; i < peerCount; i++) { + STORAGE.setPeerState(Integer.toString(i), i % 2 != 0); + } + for (int i = 0; i < peerCount; i++) { + assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i))); + } + String toRemove = Integer.toString(peerCount / 2); + STORAGE.removePeer(toRemove); + peerIds = STORAGE.listPeerIds(); + assertEquals(peerCount - 1, peerIds.size()); + assertFalse(peerIds.contains(toRemove)); + + try { + STORAGE.getPeerConfig(toRemove); + fail("Should throw a ReplicationException when get peer config of a peerId"); + } catch (ReplicationException e) { + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/31978c31/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationQueueStorage.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationQueueStorage.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationQueueStorage.java new file mode 100644 index 0000000..780ff2a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationQueueStorage.java @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.storage; + +import static org.hamcrest.CoreMatchers.hasItems; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseZKTestingUtility; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorage; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.zookeeper.KeeperException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestZKReplicationQueueStorage { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestZKReplicationQueueStorage.class); + + private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility(); + + private static ZKReplicationQueueStorage STORAGE; + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniZKCluster(); + STORAGE = new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); + } + + @AfterClass + public static void tearDown() throws IOException { + UTIL.shutdownMiniZKCluster(); + } + + @After + public void tearDownAfterTest() throws ReplicationException { + for (ServerName serverName : STORAGE.getListOfReplicators()) { + for (String queue : STORAGE.getAllQueues(serverName)) { + STORAGE.removeQueue(serverName, queue); + } + STORAGE.removeReplicatorIfQueueIsEmpty(serverName); + } + for (String peerId : STORAGE.getAllPeersFromHFileRefsQueue()) { + STORAGE.removePeerFromHFileRefs(peerId); + } + } + + private ServerName getServerName(int i) { + return ServerName.valueOf("127.0.0.1", 8000 + i, 10000 + i); + } + + @Test + public void testReplicator() throws ReplicationException { + assertTrue(STORAGE.getListOfReplicators().isEmpty()); + String queueId = "1"; + for (int i = 0; i < 10; i++) { + STORAGE.addWAL(getServerName(i), queueId, "file" + i); + } + List<ServerName> replicators = STORAGE.getListOfReplicators(); + assertEquals(10, replicators.size()); + for (int i = 0; i < 10; i++) { + assertThat(replicators, hasItems(getServerName(i))); + } + for (int i = 0; i < 5; i++) { + STORAGE.removeQueue(getServerName(i), queueId); + } + for (int i = 0; i < 10; i++) { + STORAGE.removeReplicatorIfQueueIsEmpty(getServerName(i)); + } + replicators = STORAGE.getListOfReplicators(); + assertEquals(5, replicators.size()); + for (int i = 5; i < 10; i++) { + assertThat(replicators, hasItems(getServerName(i))); + } + } + + private String getFileName(String base, int i) { + return String.format(base + "-%04d", i); + } + + @Test + public void testAddRemoveLog() throws ReplicationException { + ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000); + assertTrue(STORAGE.getAllQueues(serverName1).isEmpty()); + String queue1 = "1"; + String queue2 = "2"; + for (int i = 0; i < 10; i++) { + STORAGE.addWAL(serverName1, queue1, getFileName("file1", i)); + STORAGE.addWAL(serverName1, queue2, getFileName("file2", i)); + } + List<String> queueIds = STORAGE.getAllQueues(serverName1); + assertEquals(2, queueIds.size()); + assertThat(queueIds, hasItems("1", "2")); + + List<String> wals1 = STORAGE.getWALsInQueue(serverName1, queue1); + List<String> wals2 = STORAGE.getWALsInQueue(serverName1, queue2); + assertEquals(10, wals1.size()); + assertEquals(10, wals2.size()); + for (int i = 0; i < 10; i++) { + assertThat(wals1, hasItems(getFileName("file1", i))); + assertThat(wals2, hasItems(getFileName("file2", i))); + } + + for (int i = 0; i < 10; i++) { + assertEquals(0, STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i))); + assertEquals(0, STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i))); + STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100, null); + STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i + 1) * 100 + 10, + null); + } + + for (int i = 0; i < 10; i++) { + assertEquals((i + 1) * 100, + STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i))); + assertEquals((i + 1) * 100 + 10, + STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i))); + } + + for (int i = 0; i < 10; i++) { + if (i % 2 == 0) { + STORAGE.removeWAL(serverName1, queue1, getFileName("file1", i)); + } else { + STORAGE.removeWAL(serverName1, queue2, getFileName("file2", i)); + } + } + + queueIds = STORAGE.getAllQueues(serverName1); + assertEquals(2, queueIds.size()); + assertThat(queueIds, hasItems("1", "2")); + + ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001); + Pair<String, SortedSet<String>> peer1 = STORAGE.claimQueue(serverName1, "1", serverName2); + + assertEquals("1-" + serverName1.getServerName(), peer1.getFirst()); + assertEquals(5, peer1.getSecond().size()); + int i = 1; + for (String wal : peer1.getSecond()) { + assertEquals(getFileName("file1", i), wal); + assertEquals((i + 1) * 100, + STORAGE.getWALPosition(serverName2, peer1.getFirst(), getFileName("file1", i))); + i += 2; + } + + queueIds = STORAGE.getAllQueues(serverName1); + assertEquals(1, queueIds.size()); + assertThat(queueIds, hasItems("2")); + wals2 = STORAGE.getWALsInQueue(serverName1, queue2); + assertEquals(5, wals2.size()); + for (i = 0; i < 10; i += 2) { + assertThat(wals2, hasItems(getFileName("file2", i))); + } + + queueIds = STORAGE.getAllQueues(serverName2); + assertEquals(1, queueIds.size()); + assertThat(queueIds, hasItems(peer1.getFirst())); + wals1 = STORAGE.getWALsInQueue(serverName2, peer1.getFirst()); + assertEquals(5, wals1.size()); + for (i = 1; i < 10; i += 2) { + assertThat(wals1, hasItems(getFileName("file1", i))); + } + + Set<String> allWals = STORAGE.getAllWALs(); + assertEquals(10, allWals.size()); + for (i = 0; i < 10; i++) { + assertThat(allWals, hasItems(i % 2 == 0 ? getFileName("file2", i) : getFileName("file1", i))); + } + } + + // For HBASE-12865 + @Test + public void testClaimQueueChangeCversion() throws ReplicationException, KeeperException { + ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000); + STORAGE.addWAL(serverName1, "1", "file"); + + int v0 = STORAGE.getQueuesZNodeCversion(); + ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001); + STORAGE.claimQueue(serverName1, "1", serverName2); + int v1 = STORAGE.getQueuesZNodeCversion(); + // cversion should increase by 1 since a child node is deleted + assertEquals(1, v1 - v0); + } + + private ZKReplicationQueueStorage createWithUnstableCversion() throws IOException { + return new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()) { + + private int called = 0; + + @Override + public int getQueuesZNodeCversion() throws KeeperException { + if (called < 4) { + called++; + } + return called; + } + }; + } + + @Test + public void testGetAllWALsCversionChange() throws IOException, ReplicationException { + ZKReplicationQueueStorage storage = createWithUnstableCversion(); + storage.addWAL(getServerName(0), "1", "file"); + // This should return eventually when cversion stabilizes + Set<String> allWals = storage.getAllWALs(); + assertEquals(1, allWals.size()); + assertThat(allWals, hasItems("file")); + } + + // For HBASE-14621 + @Test + public void testGetAllHFileRefsCversionChange() throws IOException, ReplicationException { + ZKReplicationQueueStorage storage = createWithUnstableCversion(); + storage.addPeerToHFileRefs("1"); + Path p = new Path("/test"); + storage.addHFileRefs("1", Arrays.asList(Pair.newPair(p, p))); + // This should return eventually when cversion stabilizes + Set<String> allHFileRefs = storage.getAllHFileRefs(); + assertEquals(1, allHFileRefs.size()); + assertThat(allHFileRefs, hasItems("test")); + } +}