http://git-wip-us.apache.org/repos/asf/hbase/blob/b75cffe0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java index 6e27a21..d8f9625 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java @@ -21,13 +21,13 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.stream.Collectors; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileStatus; @@ -48,17 +48,18 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationQueues; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; +import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AtomicLongMap; /** @@ -303,57 +304,53 @@ public class DumpReplicationQueues extends Configured implements Tool { } public String dumpQueues(ClusterConnection connection, ZKWatcher zkw, Set<String> peerIds, - boolean hdfs) throws Exception { - ReplicationQueuesClient queuesClient; + boolean hdfs) throws Exception { + ReplicationQueueStorage queueStorage; ReplicationPeers replicationPeers; ReplicationQueues replicationQueues; ReplicationTracker replicationTracker; - ReplicationQueuesClientArguments replicationArgs = - new ReplicationQueuesClientArguments(getConf(), new WarnOnlyAbortable(), zkw); + ReplicationQueuesArguments replicationArgs = + new ReplicationQueuesArguments(getConf(), new WarnOnlyAbortable(), zkw); StringBuilder sb = new StringBuilder(); - queuesClient = ReplicationFactory.getReplicationQueuesClient(replicationArgs); - queuesClient.init(); + queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf()); replicationQueues = ReplicationFactory.getReplicationQueues(replicationArgs); - replicationPeers = ReplicationFactory.getReplicationPeers(zkw, getConf(), queuesClient, connection); + replicationPeers = + ReplicationFactory.getReplicationPeers(zkw, getConf(), queueStorage, connection); replicationTracker = ReplicationFactory.getReplicationTracker(zkw, replicationPeers, getConf(), new WarnOnlyAbortable(), new WarnOnlyStoppable()); - List<String> liveRegionServers = replicationTracker.getListOfRegionServers(); + Set<String> liveRegionServers = new HashSet<>(replicationTracker.getListOfRegionServers()); // Loops each peer on each RS and dumps the queues - try { - List<String> regionservers = queuesClient.getListOfReplicators(); - if (regionservers == null || regionservers.isEmpty()) { - return sb.toString(); + List<ServerName> regionservers = queueStorage.getListOfReplicators(); + if (regionservers == null || regionservers.isEmpty()) { + return sb.toString(); + } + for (ServerName regionserver : regionservers) { + List<String> queueIds = queueStorage.getAllQueues(regionserver); + replicationQueues.init(regionserver.getServerName()); + if (!liveRegionServers.contains(regionserver.getServerName())) { + deadRegionServers.add(regionserver.getServerName()); } - for (String regionserver : regionservers) { - List<String> queueIds = queuesClient.getAllQueues(regionserver); - replicationQueues.init(regionserver); - if (!liveRegionServers.contains(regionserver)) { - deadRegionServers.add(regionserver); - } - for (String queueId : queueIds) { - ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); - List<String> wals = queuesClient.getLogsInQueue(regionserver, queueId); - if (!peerIds.contains(queueInfo.getPeerId())) { - deletedQueues.add(regionserver + "/" + queueId); - sb.append(formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, true, - hdfs)); - } else { - sb.append(formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, false, - hdfs)); - } + for (String queueId : queueIds) { + ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); + List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId); + if (!peerIds.contains(queueInfo.getPeerId())) { + deletedQueues.add(regionserver + "/" + queueId); + sb.append( + formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, true, hdfs)); + } else { + sb.append( + formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, false, hdfs)); } } - } catch (KeeperException ke) { - throw new IOException(ke); } return sb.toString(); } - private String formatQueue(String regionserver, ReplicationQueues replicationQueues, ReplicationQueueInfo queueInfo, - String queueId, List<String> wals, boolean isDeleted, boolean hdfs) throws Exception { - + private String formatQueue(ServerName regionserver, ReplicationQueues replicationQueues, + ReplicationQueueInfo queueInfo, String queueId, List<String> wals, boolean isDeleted, + boolean hdfs) throws Exception { StringBuilder sb = new StringBuilder(); List<ServerName> deadServers; @@ -389,13 +386,14 @@ public class DumpReplicationQueues extends Configured implements Tool { /** * return total size in bytes from a list of WALs */ - private long getTotalWALSize(FileSystem fs, List<String> wals, String server) throws IOException { + private long getTotalWALSize(FileSystem fs, List<String> wals, ServerName server) + throws IOException { long size = 0; FileStatus fileStatus; for (String wal : wals) { try { - fileStatus = (new WALLink(getConf(), server, wal)).getFileStatus(fs); + fileStatus = (new WALLink(getConf(), server.getServerName(), wal)).getFileStatus(fs); } catch (IOException e) { if (e instanceof FileNotFoundException) { numWalsNotFound++;
http://git-wip-us.apache.org/repos/asf/hbase/blob/b75cffe0/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java index 839b5ad..85fa729 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.util.hbck; import java.io.IOException; @@ -27,22 +26,23 @@ import java.util.Map.Entry; import java.util.Set; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.util.HBaseFsck; import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.yetus.audience.InterfaceAudience; -/* +/** * Check and fix undeleted replication queues for removed peerId. */ @InterfaceAudience.Private public class ReplicationChecker { private final ErrorReporter errorReporter; // replicator with its queueIds for removed peers - private Map<String, List<String>> undeletedQueueIds = new HashMap<>(); + private Map<ServerName, List<String>> undeletedQueueIds = new HashMap<>(); // replicator with its undeleted queueIds for removed peers in hfile-refs queue private Set<String> undeletedHFileRefsQueueIds = new HashSet<>(); private final ReplicationZKNodeCleaner cleaner; @@ -60,8 +60,8 @@ public class ReplicationChecker { public void checkUnDeletedQueues() throws IOException { undeletedQueueIds = cleaner.getUnDeletedQueues(); - for (Entry<String, List<String>> replicatorAndQueueIds : undeletedQueueIds.entrySet()) { - String replicator = replicatorAndQueueIds.getKey(); + for (Entry<ServerName, List<String>> replicatorAndQueueIds : undeletedQueueIds.entrySet()) { + ServerName replicator = replicatorAndQueueIds.getKey(); for (String queueId : replicatorAndQueueIds.getValue()) { ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); String msg = "Undeleted replication queue for removed peer found: " http://git-wip-us.apache.org/repos/asf/hbase/blob/b75cffe0/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java index 9f3740f..cc57dfb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -34,12 +35,16 @@ import java.util.Set; import java.util.concurrent.CompletionException; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient; import org.junit.After; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -61,8 +66,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { private final String ID_ONE = "1"; private final String KEY_ONE = "127.0.0.1:2181:/hbase"; - private final String ID_SECOND = "2"; - private final String KEY_SECOND = "127.0.0.1:2181:/hbase2"; + private final String ID_TWO = "2"; + private final String KEY_TWO = "127.0.0.1:2181:/hbase2"; @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -70,21 +75,27 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); + TEST_UTIL.getConfiguration().setInt(ReadOnlyZKClient.RECOVERY_RETRY, 1); TEST_UTIL.startMiniCluster(); ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); } @After - public void cleanupPeer() { + public void clearPeerAndQueues() throws IOException, ReplicationException { try { admin.removeReplicationPeer(ID_ONE).join(); } catch (Exception e) { - LOG.debug("Replication peer " + ID_ONE + " may already be removed"); } try { - admin.removeReplicationPeer(ID_SECOND).join(); + admin.removeReplicationPeer(ID_TWO).join(); } catch (Exception e) { - LOG.debug("Replication peer " + ID_SECOND + " may already be removed"); + } + ReplicationQueueStorage queueStorage = ReplicationStorageFactory + .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration()); + for (ServerName serverName : queueStorage.getListOfReplicators()) { + for (String queue : queueStorage.getAllQueues(serverName)) { + queueStorage.removeQueue(serverName, queue); + } } } @@ -93,7 +104,7 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); rpc1.setClusterKey(KEY_ONE); ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); - rpc2.setClusterKey(KEY_SECOND); + rpc2.setClusterKey(KEY_TWO); // Add a valid peer admin.addReplicationPeer(ID_ONE, rpc1).join(); // try adding the same (fails) @@ -106,19 +117,19 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { assertEquals(1, admin.listReplicationPeers().get().size()); // Try to remove an inexisting peer try { - admin.removeReplicationPeer(ID_SECOND).join(); + admin.removeReplicationPeer(ID_TWO).join(); fail("Test case should fail as removing a inexisting peer."); } catch (CompletionException e) { // OK! } assertEquals(1, admin.listReplicationPeers().get().size()); // Add a second since multi-slave is supported - admin.addReplicationPeer(ID_SECOND, rpc2).join(); + admin.addReplicationPeer(ID_TWO, rpc2).join(); assertEquals(2, admin.listReplicationPeers().get().size()); // Remove the first peer we added admin.removeReplicationPeer(ID_ONE).join(); assertEquals(1, admin.listReplicationPeers().get().size()); - admin.removeReplicationPeer(ID_SECOND).join(); + admin.removeReplicationPeer(ID_TWO).join(); assertEquals(0, admin.listReplicationPeers().get().size()); } http://git-wip-us.apache.org/repos/asf/hbase/blob/b75cffe0/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index 29a577b..6596c6c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterR import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.ReplicationEndpointForTest; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -89,6 +90,7 @@ public class TestReplicationAdmin { @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + TEST_UTIL.getConfiguration().setInt(ReadOnlyZKClient.RECOVERY_RETRY, 1); TEST_UTIL.startMiniCluster(); admin = new ReplicationAdmin(TEST_UTIL.getConfiguration()); hbaseAdmin = TEST_UTIL.getAdmin(); http://git-wip-us.apache.org/repos/asf/hbase/blob/b75cffe0/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index 210e5c9..dd7aa6a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,17 +22,12 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.doAnswer; import java.io.IOException; -import java.lang.reflect.Field; import java.net.URLEncoder; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Random; -import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -54,9 +49,6 @@ import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -69,9 +61,6 @@ import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -207,28 +196,12 @@ public class TestLogsCleaner { } } + /** + * ReplicationLogCleaner should be able to ride over ZooKeeper errors without aborting. + */ @Test - public void testZnodeCversionChange() throws Exception { - Configuration conf = TEST_UTIL.getConfiguration(); - ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); - cleaner.setConf(conf); - - ReplicationQueuesClientZKImpl rqcMock = Mockito.mock(ReplicationQueuesClientZKImpl.class); - Mockito.when(rqcMock.getQueuesZNodeCversion()).thenReturn(1, 2, 3, 4); - - Field rqc = ReplicationLogCleaner.class.getDeclaredField("replicationQueues"); - rqc.setAccessible(true); - - rqc.set(cleaner, rqcMock); - - // This should return eventually when cversion stabilizes - cleaner.getDeletableFiles(new LinkedList<>()); - } - - @Test(timeout=10000) - public void testZooKeeperAbortDuringGetListOfReplicators() throws Exception { + public void testZooKeeperAbort() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); - ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); List<FileStatus> dummyFiles = Lists.newArrayList( @@ -236,37 +209,29 @@ public class TestLogsCleaner { new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2")) ); - FaultyZooKeeperWatcher faultyZK = - new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null); - final AtomicBoolean getListOfReplicatorsFailed = new AtomicBoolean(false); - - try { + try (FaultyZooKeeperWatcher faultyZK = new FaultyZooKeeperWatcher(conf, + "testZooKeeperAbort-faulty", null)) { faultyZK.init(); - ReplicationQueuesClient replicationQueuesClient = spy(ReplicationFactory.getReplicationQueuesClient( - new ReplicationQueuesClientArguments(conf, new ReplicationLogCleaner.WarnOnlyAbortable(), faultyZK))); - doAnswer(new Answer<Object>() { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - try { - return invocation.callRealMethod(); - } catch (KeeperException.ConnectionLossException e) { - getListOfReplicatorsFailed.set(true); - throw e; - } - } - }).when(replicationQueuesClient).getListOfReplicators(); - replicationQueuesClient.init(); - - cleaner.setConf(conf, faultyZK, replicationQueuesClient); - // should keep all files due to a ConnectionLossException getting the queues znodes + cleaner.setConf(conf, faultyZK); cleaner.preClean(); + // should keep all files due to a ConnectionLossException getting the queues znodes Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles); - - assertTrue(getListOfReplicatorsFailed.get()); assertFalse(toDelete.iterator().hasNext()); assertFalse(cleaner.isStopped()); - } finally { - faultyZK.close(); + } + + // when zk is working both files should be returned + cleaner = new ReplicationLogCleaner(); + try (ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null)) { + cleaner.setConf(conf, zkw); + cleaner.preClean(); + Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles); + Iterator<FileStatus> iter = filesToDelete.iterator(); + assertTrue(iter.hasNext()); + assertEquals(new Path("log1"), iter.next().getPath()); + assertTrue(iter.hasNext()); + assertEquals(new Path("log2"), iter.next().getPath()); + assertFalse(iter.hasNext()); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/b75cffe0/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java index f0779af..102c8d9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java @@ -25,7 +25,6 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; import java.io.IOException; -import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -51,7 +50,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -68,7 +66,6 @@ import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -200,32 +197,6 @@ public class TestReplicationHFileCleaner { assertTrue(deletableFilesIterator.next().getPath().equals(deletablefile)); } - /* - * Test for HBASE-14621. This test will not assert directly anything. Without the fix the test - * will end up in a infinite loop, so it will timeout. - */ - @Test - public void testForDifferntHFileRefsZnodeVersion() throws Exception { - // 1. Create a file - Path file = new Path(root, "testForDifferntHFileRefsZnodeVersion"); - fs.createNewFile(file); - // 2. Assert file is successfully created - assertTrue("Test file not created!", fs.exists(file)); - ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner(); - cleaner.setConf(conf); - - ReplicationQueuesClient replicationQueuesClient = Mockito.mock(ReplicationQueuesClient.class); - //Return different znode version for each call - Mockito.when(replicationQueuesClient.getHFileRefsNodeChangeVersion()).thenReturn(1, 2); - - Class<? extends ReplicationHFileCleaner> cleanerClass = cleaner.getClass(); - Field rqc = cleanerClass.getDeclaredField("rqc"); - rqc.setAccessible(true); - rqc.set(cleaner, replicationQueuesClient); - - cleaner.isFileDeletable(fs.getFileStatus(file)); - } - /** * ReplicationHFileCleaner should be able to ride over ZooKeeper errors without aborting. */ http://git-wip-us.apache.org/repos/asf/hbase/blob/b75cffe0/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java index b0910e2..22ed1a3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.master.cleaner; import static org.junit.Assert.assertEquals; @@ -26,6 +25,7 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; @@ -49,9 +49,9 @@ public class TestReplicationZKNodeCleaner { private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private final String ID_ONE = "1"; - private final String SERVER_ONE = "server1"; + private final ServerName SERVER_ONE = ServerName.valueOf("server1", 8000, 1234); private final String ID_TWO = "2"; - private final String SERVER_TWO = "server2"; + private final ServerName SERVER_TWO = ServerName.valueOf("server2", 8000, 1234); private final Configuration conf; private final ZKWatcher zkw; @@ -78,12 +78,12 @@ public class TestReplicationZKNodeCleaner { @Test public void testReplicationZKNodeCleaner() throws Exception { - repQueues.init(SERVER_ONE); + repQueues.init(SERVER_ONE.getServerName()); // add queue for ID_ONE which isn't exist repQueues.addLog(ID_ONE, "file1"); ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, null); - Map<String, List<String>> undeletedQueues = cleaner.getUnDeletedQueues(); + Map<ServerName, List<String>> undeletedQueues = cleaner.getUnDeletedQueues(); assertEquals(1, undeletedQueues.size()); assertTrue(undeletedQueues.containsKey(SERVER_ONE)); assertEquals(1, undeletedQueues.get(SERVER_ONE).size()); @@ -106,7 +106,7 @@ public class TestReplicationZKNodeCleaner { @Test public void testReplicationZKNodeCleanerChore() throws Exception { - repQueues.init(SERVER_ONE); + repQueues.init(SERVER_ONE.getServerName()); // add queue for ID_ONE which isn't exist repQueues.addLog(ID_ONE, "file1"); // add a recovery queue for ID_TWO which isn't exist http://git-wip-us.apache.org/repos/asf/hbase/blob/b75cffe0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java deleted file mode 100644 index 29c0930..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ /dev/null @@ -1,378 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.replication; - -import static org.junit.Assert.*; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.zookeeper.ZKConfig; -import org.apache.zookeeper.KeeperException; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * White box testing for replication state interfaces. Implementations should extend this class, and - * initialize the interfaces properly. - */ -public abstract class TestReplicationStateBasic { - - protected ReplicationQueues rq1; - protected ReplicationQueues rq2; - protected ReplicationQueues rq3; - protected ReplicationQueuesClient rqc; - protected String server1 = ServerName.valueOf("hostname1.example.org", 1234, -1L).toString(); - protected String server2 = ServerName.valueOf("hostname2.example.org", 1234, -1L).toString(); - protected String server3 = ServerName.valueOf("hostname3.example.org", 1234, -1L).toString(); - protected ReplicationPeers rp; - protected static final String ID_ONE = "1"; - protected static final String ID_TWO = "2"; - protected static String KEY_ONE; - protected static String KEY_TWO; - - // For testing when we try to replicate to ourself - protected String OUR_ID = "3"; - protected String OUR_KEY; - - protected static int zkTimeoutCount; - protected static final int ZK_MAX_COUNT = 300; - protected static final int ZK_SLEEP_INTERVAL = 100; // millis - - private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class); - - @Before - public void setUp() { - zkTimeoutCount = 0; - } - - @Test - public void testReplicationQueuesClient() throws ReplicationException, KeeperException { - rqc.init(); - // Test methods with empty state - assertEquals(0, rqc.getListOfReplicators().size()); - assertNull(rqc.getLogsInQueue(server1, "qId1")); - assertNull(rqc.getAllQueues(server1)); - - /* - * Set up data Two replicators: -- server1: three queues with 0, 1 and 2 log files each -- - * server2: zero queues - */ - rq1.init(server1); - rq2.init(server2); - rq1.addLog("qId1", "trash"); - rq1.removeLog("qId1", "trash"); - rq1.addLog("qId2", "filename1"); - rq1.addLog("qId3", "filename2"); - rq1.addLog("qId3", "filename3"); - rq2.addLog("trash", "trash"); - rq2.removeQueue("trash"); - - List<String> reps = rqc.getListOfReplicators(); - assertEquals(2, reps.size()); - assertTrue(server1, reps.contains(server1)); - assertTrue(server2, reps.contains(server2)); - - assertNull(rqc.getLogsInQueue("bogus", "bogus")); - assertNull(rqc.getLogsInQueue(server1, "bogus")); - assertEquals(0, rqc.getLogsInQueue(server1, "qId1").size()); - assertEquals(1, rqc.getLogsInQueue(server1, "qId2").size()); - assertEquals("filename1", rqc.getLogsInQueue(server1, "qId2").get(0)); - - assertNull(rqc.getAllQueues("bogus")); - assertEquals(0, rqc.getAllQueues(server2).size()); - List<String> list = rqc.getAllQueues(server1); - assertEquals(3, list.size()); - assertTrue(list.contains("qId2")); - assertTrue(list.contains("qId3")); - } - - @Test - public void testReplicationQueues() throws ReplicationException { - rq1.init(server1); - rq2.init(server2); - rq3.init(server3); - //Initialize ReplicationPeer so we can add peers (we don't transfer lone queues) - rp.init(); - - // 3 replicators should exist - assertEquals(3, rq1.getListOfReplicators().size()); - rq1.removeQueue("bogus"); - rq1.removeLog("bogus", "bogus"); - rq1.removeAllQueues(); - assertEquals(0, rq1.getAllQueues().size()); - assertEquals(0, rq1.getLogPosition("bogus", "bogus")); - assertNull(rq1.getLogsInQueue("bogus")); - assertNull(rq1.getUnClaimedQueueIds( - ServerName.valueOf("bogus", 1234, -1L).toString())); - - rq1.setLogPosition("bogus", "bogus", 5L); - - populateQueues(); - - assertEquals(3, rq1.getListOfReplicators().size()); - assertEquals(0, rq2.getLogsInQueue("qId1").size()); - assertEquals(5, rq3.getLogsInQueue("qId5").size()); - assertEquals(0, rq3.getLogPosition("qId1", "filename0")); - rq3.setLogPosition("qId5", "filename4", 354L); - assertEquals(354L, rq3.getLogPosition("qId5", "filename4")); - - assertEquals(5, rq3.getLogsInQueue("qId5").size()); - assertEquals(0, rq2.getLogsInQueue("qId1").size()); - assertEquals(0, rq1.getAllQueues().size()); - assertEquals(1, rq2.getAllQueues().size()); - assertEquals(5, rq3.getAllQueues().size()); - - assertEquals(0, rq3.getUnClaimedQueueIds(server1).size()); - rq3.removeReplicatorIfQueueIsEmpty(server1); - assertEquals(2, rq3.getListOfReplicators().size()); - - List<String> queues = rq2.getUnClaimedQueueIds(server3); - assertEquals(5, queues.size()); - for(String queue: queues) { - rq2.claimQueue(server3, queue); - } - rq2.removeReplicatorIfQueueIsEmpty(server3); - assertEquals(1, rq2.getListOfReplicators().size()); - - // Try to claim our own queues - assertNull(rq2.getUnClaimedQueueIds(server2)); - rq2.removeReplicatorIfQueueIsEmpty(server2); - - assertEquals(6, rq2.getAllQueues().size()); - - rq2.removeAllQueues(); - - assertEquals(0, rq2.getListOfReplicators().size()); - } - - @Test - public void testInvalidClusterKeys() throws ReplicationException, KeeperException { - rp.init(); - - try { - rp.registerPeer(ID_ONE, - new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:hbase")); - fail("Should throw an IllegalArgumentException because " - + "zookeeper.znode.parent is missing leading '/'."); - } catch (IllegalArgumentException e) { - // Expected. - } - - try { - rp.registerPeer(ID_ONE, - new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:/")); - fail("Should throw an IllegalArgumentException because zookeeper.znode.parent is missing."); - } catch (IllegalArgumentException e) { - // Expected. - } - - try { - rp.registerPeer(ID_ONE, - new ReplicationPeerConfig().setClusterKey("hostname1.example.org::/hbase")); - fail("Should throw an IllegalArgumentException because " - + "hbase.zookeeper.property.clientPort is missing."); - } catch (IllegalArgumentException e) { - // Expected. - } - } - - @Test - public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException { - rp.init(); - rq1.init(server1); - rqc.init(); - - List<Pair<Path, Path>> files1 = new ArrayList<>(3); - files1.add(new Pair<>(null, new Path("file_1"))); - files1.add(new Pair<>(null, new Path("file_2"))); - files1.add(new Pair<>(null, new Path("file_3"))); - assertNull(rqc.getReplicableHFiles(ID_ONE)); - assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size()); - rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); - rq1.addPeerToHFileRefs(ID_ONE); - rq1.addHFileRefs(ID_ONE, files1); - assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size()); - assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size()); - List<String> hfiles2 = new ArrayList<>(files1.size()); - for (Pair<Path, Path> p : files1) { - hfiles2.add(p.getSecond().getName()); - } - String removedString = hfiles2.remove(0); - rq1.removeHFileRefs(ID_ONE, hfiles2); - assertEquals(1, rqc.getReplicableHFiles(ID_ONE).size()); - hfiles2 = new ArrayList<>(1); - hfiles2.add(removedString); - rq1.removeHFileRefs(ID_ONE, hfiles2); - assertEquals(0, rqc.getReplicableHFiles(ID_ONE).size()); - rp.unregisterPeer(ID_ONE); - } - - @Test - public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException { - rq1.init(server1); - rqc.init(); - - rp.init(); - rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); - rq1.addPeerToHFileRefs(ID_ONE); - rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO)); - rq1.addPeerToHFileRefs(ID_TWO); - - List<Pair<Path, Path>> files1 = new ArrayList<>(3); - files1.add(new Pair<>(null, new Path("file_1"))); - files1.add(new Pair<>(null, new Path("file_2"))); - files1.add(new Pair<>(null, new Path("file_3"))); - rq1.addHFileRefs(ID_ONE, files1); - rq1.addHFileRefs(ID_TWO, files1); - assertEquals(2, rqc.getAllPeersFromHFileRefsQueue().size()); - assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size()); - assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size()); - - rp.unregisterPeer(ID_ONE); - rq1.removePeerFromHFileRefs(ID_ONE); - assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size()); - assertNull(rqc.getReplicableHFiles(ID_ONE)); - assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size()); - - rp.unregisterPeer(ID_TWO); - rq1.removePeerFromHFileRefs(ID_TWO); - assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size()); - assertNull(rqc.getReplicableHFiles(ID_TWO)); - } - - @Test - public void testReplicationPeers() throws Exception { - rp.init(); - - // Test methods with non-existent peer ids - try { - rp.unregisterPeer("bogus"); - fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); - } catch (IllegalArgumentException e) { - } - try { - rp.enablePeer("bogus"); - fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); - } catch (IllegalArgumentException e) { - } - try { - rp.disablePeer("bogus"); - fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); - } catch (IllegalArgumentException e) { - } - try { - rp.getStatusOfPeer("bogus"); - fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); - } catch (IllegalArgumentException e) { - } - assertFalse(rp.peerConnected("bogus")); - rp.peerDisconnected("bogus"); - - assertNull(rp.getPeerConf("bogus")); - assertNumberOfPeers(0); - - // Add some peers - rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); - assertNumberOfPeers(1); - rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO)); - assertNumberOfPeers(2); - - // Test methods with a peer that is added but not connected - try { - rp.getStatusOfPeer(ID_ONE); - fail("There are no connected peers, should have thrown an IllegalArgumentException"); - } catch (IllegalArgumentException e) { - } - assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond())); - rp.unregisterPeer(ID_ONE); - rp.peerDisconnected(ID_ONE); - assertNumberOfPeers(1); - - // Add one peer - rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); - rp.peerConnected(ID_ONE); - assertNumberOfPeers(2); - assertTrue(rp.getStatusOfPeer(ID_ONE)); - rp.disablePeer(ID_ONE); - assertConnectedPeerStatus(false, ID_ONE); - rp.enablePeer(ID_ONE); - assertConnectedPeerStatus(true, ID_ONE); - - // Disconnect peer - rp.peerDisconnected(ID_ONE); - assertNumberOfPeers(2); - try { - rp.getStatusOfPeer(ID_ONE); - fail("There are no connected peers, should have thrown an IllegalArgumentException"); - } catch (IllegalArgumentException e) { - } - } - - protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception { - // we can first check if the value was changed in the store, if it wasn't then fail right away - if (status != rp.getStatusOfPeerFromBackingStore(peerId)) { - fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK"); - } - while (true) { - if (status == rp.getStatusOfPeer(peerId)) { - return; - } - if (zkTimeoutCount < ZK_MAX_COUNT) { - LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status - + ", sleeping and trying again."); - Thread.sleep(ZK_SLEEP_INTERVAL); - } else { - fail("Timed out waiting for ConnectedPeerStatus to be " + status); - } - } - } - - protected void assertNumberOfPeers(int total) { - assertEquals(total, rp.getAllPeerConfigs().size()); - assertEquals(total, rp.getAllPeerIds().size()); - assertEquals(total, rp.getAllPeerIds().size()); - } - - /* - * three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 has 5 queues with 1, 2, - * 3, 4, 5 log files respectively - */ - protected void populateQueues() throws ReplicationException { - rq1.addLog("trash", "trash"); - rq1.removeQueue("trash"); - - rq2.addLog("qId1", "trash"); - rq2.removeLog("qId1", "trash"); - - for (int i = 1; i < 6; i++) { - for (int j = 0; j < i; j++) { - rq3.addLog("qId" + i, "filename" + j); - } - //Add peers for the corresponding queues so they are not orphans - rp.registerPeer("qId" + i, new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus" + i)); - } - } -} - http://git-wip-us.apache.org/repos/asf/hbase/blob/b75cffe0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java deleted file mode 100644 index d51d3c3..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java +++ /dev/null @@ -1,232 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.IOException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hbase.ChoreService; -import org.apache.hadoop.hbase.ClusterId; -import org.apache.hadoop.hbase.CoordinatedStateManager; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.ClusterConnection; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.ReplicationTests; -import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; -import org.apache.hadoop.hbase.zookeeper.ZKClusterId; -import org.apache.hadoop.hbase.zookeeper.ZKConfig; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.hadoop.hbase.zookeeper.ZNodePaths; -import org.apache.zookeeper.KeeperException; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Category({ReplicationTests.class, MediumTests.class}) -public class TestReplicationStateZKImpl extends TestReplicationStateBasic { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestReplicationStateZKImpl.class); - - private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateZKImpl.class); - - private static Configuration conf; - private static HBaseTestingUtility utility; - private static ZKWatcher zkw; - private static String replicationZNode; - private ReplicationQueuesZKImpl rqZK; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - utility = new HBaseTestingUtility(); - utility.startMiniZKCluster(); - conf = utility.getConfiguration(); - conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); - zkw = HBaseTestingUtility.getZooKeeperWatcher(utility); - String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); - replicationZNode = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, replicationZNodeName); - KEY_ONE = initPeerClusterState("/hbase1"); - KEY_TWO = initPeerClusterState("/hbase2"); - } - - private static String initPeerClusterState(String baseZKNode) - throws IOException, KeeperException { - // Add a dummy region server and set up the cluster id - Configuration testConf = new Configuration(conf); - testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode); - ZKWatcher zkw1 = new ZKWatcher(testConf, "test1", null); - String fakeRs = ZNodePaths.joinZNode(zkw1.znodePaths.rsZNode, "hostname1.example.org:1234"); - ZKUtil.createWithParents(zkw1, fakeRs); - ZKClusterId.setClusterId(zkw1, new ClusterId()); - return ZKConfig.getZooKeeperClusterKey(testConf); - } - - @Before - @Override - public void setUp() { - super.setUp(); - DummyServer ds1 = new DummyServer(server1); - DummyServer ds2 = new DummyServer(server2); - DummyServer ds3 = new DummyServer(server3); - try { - rq1 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds1, zkw)); - rq2 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds2, zkw)); - rq3 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds3, zkw)); - rqc = ReplicationFactory.getReplicationQueuesClient( - new ReplicationQueuesClientArguments(conf, ds1, zkw)); - } catch (Exception e) { - // This should not occur, because getReplicationQueues() only throws for - // TableBasedReplicationQueuesImpl - fail("ReplicationFactory.getReplicationQueues() threw an IO Exception"); - } - rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw); - OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf); - rqZK = new ReplicationQueuesZKImpl(zkw, conf, ds1); - } - - @After - public void tearDown() throws KeeperException, IOException { - ZKUtil.deleteNodeRecursively(zkw, replicationZNode); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - utility.shutdownMiniZKCluster(); - } - - @Test - public void testIsPeerPath_PathToParentOfPeerNode() { - assertFalse(rqZK.isPeerPath(rqZK.peersZNode)); - } - - @Test - public void testIsPeerPath_PathToChildOfPeerNode() { - String peerChild = ZNodePaths.joinZNode(ZNodePaths.joinZNode(rqZK.peersZNode, "1"), "child"); - assertFalse(rqZK.isPeerPath(peerChild)); - } - - @Test - public void testIsPeerPath_ActualPeerPath() { - String peerPath = ZNodePaths.joinZNode(rqZK.peersZNode, "1"); - assertTrue(rqZK.isPeerPath(peerPath)); - } - - static class DummyServer implements Server { - private String serverName; - private boolean isAborted = false; - private boolean isStopped = false; - - public DummyServer(String serverName) { - this.serverName = serverName; - } - - @Override - public Configuration getConfiguration() { - return conf; - } - - @Override - public ZKWatcher getZooKeeper() { - return zkw; - } - - @Override - public CoordinatedStateManager getCoordinatedStateManager() { - return null; - } - - @Override - public ClusterConnection getConnection() { - return null; - } - - @Override - public MetaTableLocator getMetaTableLocator() { - return null; - } - - @Override - public ServerName getServerName() { - return ServerName.valueOf(this.serverName); - } - - @Override - public void abort(String why, Throwable e) { - LOG.info("Aborting " + serverName); - this.isAborted = true; - } - - @Override - public boolean isAborted() { - return this.isAborted; - } - - @Override - public void stop(String why) { - this.isStopped = true; - } - - @Override - public boolean isStopped() { - return this.isStopped; - } - - @Override - public ChoreService getChoreService() { - return null; - } - - @Override - public ClusterConnection getClusterConnection() { - // TODO Auto-generated method stub - return null; - } - - @Override - public FileSystem getFileSystem() { - return null; - } - - @Override - public boolean isStopping() { - return false; - } - - @Override - public Connection createConnection(Configuration conf) throws IOException { - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/b75cffe0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java index 64db7a8..4d81d1d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.replication.regionserver; - -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.util.List; @@ -31,8 +29,6 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; @@ -119,41 +115,4 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan server.stop(""); } - - @Test - public void testFailoverDeadServerCversionChange() throws Exception { - final Server s0 = new DummyServer("cversion-change0.example.org"); - ReplicationQueues repQueues = - ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, s0, - s0.getZooKeeper())); - repQueues.init(s0.getServerName().toString()); - // populate some znodes in the peer znode - files.add("log1"); - files.add("log2"); - for (String file : files) { - repQueues.addLog("1", file); - } - // simulate queue transfer - Server s1 = new DummyServer("cversion-change1.example.org"); - ReplicationQueues rq1 = - ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1, - s1.getZooKeeper())); - rq1.init(s1.getServerName().toString()); - - ReplicationQueuesClientZKImpl client = - (ReplicationQueuesClientZKImpl)ReplicationFactory.getReplicationQueuesClient( - new ReplicationQueuesClientArguments(s1.getConfiguration(), s1, s1.getZooKeeper())); - - int v0 = client.getQueuesZNodeCversion(); - List<String> queues = rq1.getUnClaimedQueueIds(s0.getServerName().getServerName()); - for(String queue : queues) { - rq1.claimQueue(s0.getServerName().getServerName(), queue); - } - rq1.removeReplicatorIfQueueIsEmpty(s0.getServerName().getServerName()); - int v1 = client.getQueuesZNodeCversion(); - // cversion should increase by 1 since a child node is deleted - assertEquals(v0 + 1, v1); - - s0.stop(""); - } }