http://git-wip-us.apache.org/repos/asf/hbase/blob/15055a42/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
index 6e27a21..d8f9625 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
@@ -21,13 +21,13 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.stream.Collectors;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
@@ -48,17 +48,18 @@ import 
org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.AtomicLongMap;
 
 /**
@@ -303,57 +304,53 @@ public class DumpReplicationQueues extends Configured 
implements Tool {
   }
 
   public String dumpQueues(ClusterConnection connection, ZKWatcher zkw, 
Set<String> peerIds,
-                           boolean hdfs) throws Exception {
-    ReplicationQueuesClient queuesClient;
+      boolean hdfs) throws Exception {
+    ReplicationQueueStorage queueStorage;
     ReplicationPeers replicationPeers;
     ReplicationQueues replicationQueues;
     ReplicationTracker replicationTracker;
-    ReplicationQueuesClientArguments replicationArgs =
-        new ReplicationQueuesClientArguments(getConf(), new 
WarnOnlyAbortable(), zkw);
+    ReplicationQueuesArguments replicationArgs =
+        new ReplicationQueuesArguments(getConf(), new WarnOnlyAbortable(), 
zkw);
     StringBuilder sb = new StringBuilder();
 
-    queuesClient = 
ReplicationFactory.getReplicationQueuesClient(replicationArgs);
-    queuesClient.init();
+    queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, 
getConf());
     replicationQueues = 
ReplicationFactory.getReplicationQueues(replicationArgs);
-    replicationPeers = ReplicationFactory.getReplicationPeers(zkw, getConf(), 
queuesClient, connection);
+    replicationPeers =
+        ReplicationFactory.getReplicationPeers(zkw, getConf(), queueStorage, 
connection);
     replicationTracker = ReplicationFactory.getReplicationTracker(zkw, 
replicationPeers, getConf(),
       new WarnOnlyAbortable(), new WarnOnlyStoppable());
-    List<String> liveRegionServers = 
replicationTracker.getListOfRegionServers();
+    Set<String> liveRegionServers = new 
HashSet<>(replicationTracker.getListOfRegionServers());
 
     // Loops each peer on each RS and dumps the queues
-    try {
-      List<String> regionservers = queuesClient.getListOfReplicators();
-      if (regionservers == null || regionservers.isEmpty()) {
-        return sb.toString();
+    List<ServerName> regionservers = queueStorage.getListOfReplicators();
+    if (regionservers == null || regionservers.isEmpty()) {
+      return sb.toString();
+    }
+    for (ServerName regionserver : regionservers) {
+      List<String> queueIds = queueStorage.getAllQueues(regionserver);
+      replicationQueues.init(regionserver.getServerName());
+      if (!liveRegionServers.contains(regionserver.getServerName())) {
+        deadRegionServers.add(regionserver.getServerName());
       }
-      for (String regionserver : regionservers) {
-        List<String> queueIds = queuesClient.getAllQueues(regionserver);
-        replicationQueues.init(regionserver);
-        if (!liveRegionServers.contains(regionserver)) {
-          deadRegionServers.add(regionserver);
-        }
-        for (String queueId : queueIds) {
-          ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
-          List<String> wals = queuesClient.getLogsInQueue(regionserver, 
queueId);
-          if (!peerIds.contains(queueInfo.getPeerId())) {
-            deletedQueues.add(regionserver + "/" + queueId);
-            sb.append(formatQueue(regionserver, replicationQueues, queueInfo, 
queueId, wals, true,
-              hdfs));
-          } else {
-            sb.append(formatQueue(regionserver, replicationQueues, queueInfo, 
queueId, wals, false,
-              hdfs));
-          }
+      for (String queueId : queueIds) {
+        ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
+        List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId);
+        if (!peerIds.contains(queueInfo.getPeerId())) {
+          deletedQueues.add(regionserver + "/" + queueId);
+          sb.append(
+            formatQueue(regionserver, replicationQueues, queueInfo, queueId, 
wals, true, hdfs));
+        } else {
+          sb.append(
+            formatQueue(regionserver, replicationQueues, queueInfo, queueId, 
wals, false, hdfs));
         }
       }
-    } catch (KeeperException ke) {
-      throw new IOException(ke);
     }
     return sb.toString();
   }
 
-  private String formatQueue(String regionserver, ReplicationQueues 
replicationQueues, ReplicationQueueInfo queueInfo,
-                           String queueId, List<String> wals, boolean 
isDeleted, boolean hdfs) throws Exception {
-
+  private String formatQueue(ServerName regionserver, ReplicationQueues 
replicationQueues,
+      ReplicationQueueInfo queueInfo, String queueId, List<String> wals, 
boolean isDeleted,
+      boolean hdfs) throws Exception {
     StringBuilder sb = new StringBuilder();
 
     List<ServerName> deadServers;
@@ -389,13 +386,14 @@ public class DumpReplicationQueues extends Configured 
implements Tool {
   /**
    *  return total size in bytes from a list of WALs
    */
-  private long getTotalWALSize(FileSystem fs, List<String> wals, String 
server) throws IOException {
+  private long getTotalWALSize(FileSystem fs, List<String> wals, ServerName 
server)
+      throws IOException {
     long size = 0;
     FileStatus fileStatus;
 
     for (String wal : wals) {
       try {
-        fileStatus = (new WALLink(getConf(), server, wal)).getFileStatus(fs);
+        fileStatus = (new WALLink(getConf(), server.getServerName(), 
wal)).getFileStatus(fs);
       } catch (IOException e) {
         if (e instanceof FileNotFoundException) {
           numWalsNotFound++;

http://git-wip-us.apache.org/repos/asf/hbase/blob/15055a42/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
index 839b5ad..85fa729 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.util.hbck;
 
 import java.io.IOException;
@@ -27,22 +26,23 @@ import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.util.HBaseFsck;
 import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
 
-/*
+/**
  * Check and fix undeleted replication queues for removed peerId.
  */
 @InterfaceAudience.Private
 public class ReplicationChecker {
   private final ErrorReporter errorReporter;
   // replicator with its queueIds for removed peers
-  private Map<String, List<String>> undeletedQueueIds = new HashMap<>();
+  private Map<ServerName, List<String>> undeletedQueueIds = new HashMap<>();
   // replicator with its undeleted queueIds for removed peers in hfile-refs 
queue
   private Set<String> undeletedHFileRefsQueueIds = new HashSet<>();
   private final ReplicationZKNodeCleaner cleaner;
@@ -60,8 +60,8 @@ public class ReplicationChecker {
 
   public void checkUnDeletedQueues() throws IOException {
     undeletedQueueIds = cleaner.getUnDeletedQueues();
-    for (Entry<String, List<String>> replicatorAndQueueIds : 
undeletedQueueIds.entrySet()) {
-      String replicator = replicatorAndQueueIds.getKey();
+    for (Entry<ServerName, List<String>> replicatorAndQueueIds : 
undeletedQueueIds.entrySet()) {
+      ServerName replicator = replicatorAndQueueIds.getKey();
       for (String queueId : replicatorAndQueueIds.getValue()) {
         ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
         String msg = "Undeleted replication queue for removed peer found: "

http://git-wip-us.apache.org/repos/asf/hbase/blob/15055a42/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
index 9f3740f..cc57dfb 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -34,12 +35,16 @@ import java.util.Set;
 import java.util.concurrent.CompletionException;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient;
 import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -61,8 +66,8 @@ public class TestAsyncReplicationAdminApi extends 
TestAsyncAdminBase {
 
   private final String ID_ONE = "1";
   private final String KEY_ONE = "127.0.0.1:2181:/hbase";
-  private final String ID_SECOND = "2";
-  private final String KEY_SECOND = "127.0.0.1:2181:/hbase2";
+  private final String ID_TWO = "2";
+  private final String KEY_TWO = "127.0.0.1:2181:/hbase2";
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
@@ -70,21 +75,27 @@ public class TestAsyncReplicationAdminApi extends 
TestAsyncAdminBase {
     
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 
120000);
     
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
     TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
+    TEST_UTIL.getConfiguration().setInt(ReadOnlyZKClient.RECOVERY_RETRY, 1);
     TEST_UTIL.startMiniCluster();
     ASYNC_CONN = 
ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
   }
 
   @After
-  public void cleanupPeer() {
+  public void clearPeerAndQueues() throws IOException, ReplicationException {
     try {
       admin.removeReplicationPeer(ID_ONE).join();
     } catch (Exception e) {
-      LOG.debug("Replication peer " + ID_ONE + " may already be removed");
     }
     try {
-      admin.removeReplicationPeer(ID_SECOND).join();
+      admin.removeReplicationPeer(ID_TWO).join();
     } catch (Exception e) {
-      LOG.debug("Replication peer " + ID_SECOND + " may already be removed");
+    }
+    ReplicationQueueStorage queueStorage = ReplicationStorageFactory
+        .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), 
TEST_UTIL.getConfiguration());
+    for (ServerName serverName : queueStorage.getListOfReplicators()) {
+      for (String queue : queueStorage.getAllQueues(serverName)) {
+        queueStorage.removeQueue(serverName, queue);
+      }
     }
   }
 
@@ -93,7 +104,7 @@ public class TestAsyncReplicationAdminApi extends 
TestAsyncAdminBase {
     ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
     rpc1.setClusterKey(KEY_ONE);
     ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
-    rpc2.setClusterKey(KEY_SECOND);
+    rpc2.setClusterKey(KEY_TWO);
     // Add a valid peer
     admin.addReplicationPeer(ID_ONE, rpc1).join();
     // try adding the same (fails)
@@ -106,19 +117,19 @@ public class TestAsyncReplicationAdminApi extends 
TestAsyncAdminBase {
     assertEquals(1, admin.listReplicationPeers().get().size());
     // Try to remove an inexisting peer
     try {
-      admin.removeReplicationPeer(ID_SECOND).join();
+      admin.removeReplicationPeer(ID_TWO).join();
       fail("Test case should fail as removing a inexisting peer.");
     } catch (CompletionException e) {
       // OK!
     }
     assertEquals(1, admin.listReplicationPeers().get().size());
     // Add a second since multi-slave is supported
-    admin.addReplicationPeer(ID_SECOND, rpc2).join();
+    admin.addReplicationPeer(ID_TWO, rpc2).join();
     assertEquals(2, admin.listReplicationPeers().get().size());
     // Remove the first peer we added
     admin.removeReplicationPeer(ID_ONE).join();
     assertEquals(1, admin.listReplicationPeers().get().size());
-    admin.removeReplicationPeer(ID_SECOND).join();
+    admin.removeReplicationPeer(ID_TWO).join();
     assertEquals(0, admin.listReplicationPeers().get().size());
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/15055a42/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index 29a577b..6596c6c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -50,6 +50,7 @@ import 
org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterR
 import 
org.apache.hadoop.hbase.replication.TestReplicationEndpoint.ReplicationEndpointForTest;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -89,6 +90,7 @@ public class TestReplicationAdmin {
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+    TEST_UTIL.getConfiguration().setInt(ReadOnlyZKClient.RECOVERY_RETRY, 1);
     TEST_UTIL.startMiniCluster();
     admin = new ReplicationAdmin(TEST_UTIL.getConfiguration());
     hbaseAdmin = TEST_UTIL.getAdmin();

http://git-wip-us.apache.org/repos/asf/hbase/blob/15055a42/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
index 210e5c9..dd7aa6a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,17 +22,12 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.doAnswer;
 
 import java.io.IOException;
-import java.lang.reflect.Field;
 import java.net.URLEncoder;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -54,9 +49,6 @@ import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
 import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -69,9 +61,6 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -207,28 +196,12 @@ public class TestLogsCleaner {
     }
   }
 
+  /**
+   * ReplicationLogCleaner should be able to ride over ZooKeeper errors 
without aborting.
+   */
   @Test
-  public void testZnodeCversionChange() throws Exception {
-    Configuration conf = TEST_UTIL.getConfiguration();
-    ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
-    cleaner.setConf(conf);
-
-    ReplicationQueuesClientZKImpl rqcMock = 
Mockito.mock(ReplicationQueuesClientZKImpl.class);
-    Mockito.when(rqcMock.getQueuesZNodeCversion()).thenReturn(1, 2, 3, 4);
-
-    Field rqc = 
ReplicationLogCleaner.class.getDeclaredField("replicationQueues");
-    rqc.setAccessible(true);
-
-    rqc.set(cleaner, rqcMock);
-
-    // This should return eventually when cversion stabilizes
-    cleaner.getDeletableFiles(new LinkedList<>());
-  }
-
-  @Test(timeout=10000)
-  public void testZooKeeperAbortDuringGetListOfReplicators() throws Exception {
+  public void testZooKeeperAbort() throws Exception {
     Configuration conf = TEST_UTIL.getConfiguration();
-
     ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
 
     List<FileStatus> dummyFiles = Lists.newArrayList(
@@ -236,37 +209,29 @@ public class TestLogsCleaner {
         new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new 
Path("log2"))
     );
 
-    FaultyZooKeeperWatcher faultyZK =
-        new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null);
-    final AtomicBoolean getListOfReplicatorsFailed = new AtomicBoolean(false);
-
-    try {
+    try (FaultyZooKeeperWatcher faultyZK = new FaultyZooKeeperWatcher(conf,
+        "testZooKeeperAbort-faulty", null)) {
       faultyZK.init();
-      ReplicationQueuesClient replicationQueuesClient = 
spy(ReplicationFactory.getReplicationQueuesClient(
-        new ReplicationQueuesClientArguments(conf, new 
ReplicationLogCleaner.WarnOnlyAbortable(), faultyZK)));
-      doAnswer(new Answer<Object>() {
-        @Override
-        public Object answer(InvocationOnMock invocation) throws Throwable {
-          try {
-            return invocation.callRealMethod();
-          } catch (KeeperException.ConnectionLossException e) {
-            getListOfReplicatorsFailed.set(true);
-            throw e;
-          }
-        }
-      }).when(replicationQueuesClient).getListOfReplicators();
-      replicationQueuesClient.init();
-
-      cleaner.setConf(conf, faultyZK, replicationQueuesClient);
-      // should keep all files due to a ConnectionLossException getting the 
queues znodes
+      cleaner.setConf(conf, faultyZK);
       cleaner.preClean();
+      // should keep all files due to a ConnectionLossException getting the 
queues znodes
       Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles);
-
-      assertTrue(getListOfReplicatorsFailed.get());
       assertFalse(toDelete.iterator().hasNext());
       assertFalse(cleaner.isStopped());
-    } finally {
-      faultyZK.close();
+    }
+
+    // when zk is working both files should be returned
+    cleaner = new ReplicationLogCleaner();
+    try (ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", 
null)) {
+      cleaner.setConf(conf, zkw);
+      cleaner.preClean();
+      Iterable<FileStatus> filesToDelete = 
cleaner.getDeletableFiles(dummyFiles);
+      Iterator<FileStatus> iter = filesToDelete.iterator();
+      assertTrue(iter.hasNext());
+      assertEquals(new Path("log1"), iter.next().getPath());
+      assertTrue(iter.hasNext());
+      assertEquals(new Path("log2"), iter.next().getPath());
+      assertFalse(iter.hasNext());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/15055a42/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
index f0779af..102c8d9 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
@@ -25,7 +25,6 @@ import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.spy;
 
 import java.io.IOException;
-import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -51,7 +50,6 @@ import 
org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -68,7 +66,6 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -200,32 +197,6 @@ public class TestReplicationHFileCleaner {
     assertTrue(deletableFilesIterator.next().getPath().equals(deletablefile));
   }
 
-  /*
-   * Test for HBASE-14621. This test will not assert directly anything. 
Without the fix the test
-   * will end up in a infinite loop, so it will timeout.
-   */
-  @Test
-  public void testForDifferntHFileRefsZnodeVersion() throws Exception {
-    // 1. Create a file
-    Path file = new Path(root, "testForDifferntHFileRefsZnodeVersion");
-    fs.createNewFile(file);
-    // 2. Assert file is successfully created
-    assertTrue("Test file not created!", fs.exists(file));
-    ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
-    cleaner.setConf(conf);
-
-    ReplicationQueuesClient replicationQueuesClient = 
Mockito.mock(ReplicationQueuesClient.class);
-    //Return different znode version for each call
-    
Mockito.when(replicationQueuesClient.getHFileRefsNodeChangeVersion()).thenReturn(1,
 2);
-
-    Class<? extends ReplicationHFileCleaner> cleanerClass = cleaner.getClass();
-    Field rqc = cleanerClass.getDeclaredField("rqc");
-    rqc.setAccessible(true);
-    rqc.set(cleaner, replicationQueuesClient);
-
-    cleaner.isFileDeletable(fs.getFileStatus(file));
-  }
-
   /**
    * ReplicationHFileCleaner should be able to ride over ZooKeeper errors 
without aborting.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/15055a42/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
index b0910e2..22ed1a3 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.master.cleaner;
 
 import static org.junit.Assert.assertEquals;
@@ -26,6 +25,7 @@ import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
@@ -49,9 +49,9 @@ public class TestReplicationZKNodeCleaner {
   private final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
 
   private final String ID_ONE = "1";
-  private final String SERVER_ONE = "server1";
+  private final ServerName SERVER_ONE = ServerName.valueOf("server1", 8000, 
1234);
   private final String ID_TWO = "2";
-  private final String SERVER_TWO = "server2";
+  private final ServerName SERVER_TWO = ServerName.valueOf("server2", 8000, 
1234);
 
   private final Configuration conf;
   private final ZKWatcher zkw;
@@ -78,12 +78,12 @@ public class TestReplicationZKNodeCleaner {
 
   @Test
   public void testReplicationZKNodeCleaner() throws Exception {
-    repQueues.init(SERVER_ONE);
+    repQueues.init(SERVER_ONE.getServerName());
     // add queue for ID_ONE which isn't exist
     repQueues.addLog(ID_ONE, "file1");
 
     ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, 
null);
-    Map<String, List<String>> undeletedQueues = cleaner.getUnDeletedQueues();
+    Map<ServerName, List<String>> undeletedQueues = 
cleaner.getUnDeletedQueues();
     assertEquals(1, undeletedQueues.size());
     assertTrue(undeletedQueues.containsKey(SERVER_ONE));
     assertEquals(1, undeletedQueues.get(SERVER_ONE).size());
@@ -106,7 +106,7 @@ public class TestReplicationZKNodeCleaner {
 
   @Test
   public void testReplicationZKNodeCleanerChore() throws Exception {
-    repQueues.init(SERVER_ONE);
+    repQueues.init(SERVER_ONE.getServerName());
     // add queue for ID_ONE which isn't exist
     repQueues.addLog(ID_ONE, "file1");
     // add a recovery queue for ID_TWO which isn't exist

http://git-wip-us.apache.org/repos/asf/hbase/blob/15055a42/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
deleted file mode 100644
index 29c0930..0000000
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ /dev/null
@@ -1,378 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.replication;
-
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.ZKConfig;
-import org.apache.zookeeper.KeeperException;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * White box testing for replication state interfaces. Implementations should 
extend this class, and
- * initialize the interfaces properly.
- */
-public abstract class TestReplicationStateBasic {
-
-  protected ReplicationQueues rq1;
-  protected ReplicationQueues rq2;
-  protected ReplicationQueues rq3;
-  protected ReplicationQueuesClient rqc;
-  protected String server1 = ServerName.valueOf("hostname1.example.org", 1234, 
-1L).toString();
-  protected String server2 = ServerName.valueOf("hostname2.example.org", 1234, 
-1L).toString();
-  protected String server3 = ServerName.valueOf("hostname3.example.org", 1234, 
-1L).toString();
-  protected ReplicationPeers rp;
-  protected static final String ID_ONE = "1";
-  protected static final String ID_TWO = "2";
-  protected static String KEY_ONE;
-  protected static String KEY_TWO;
-
-  // For testing when we try to replicate to ourself
-  protected String OUR_ID = "3";
-  protected String OUR_KEY;
-
-  protected static int zkTimeoutCount;
-  protected static final int ZK_MAX_COUNT = 300;
-  protected static final int ZK_SLEEP_INTERVAL = 100; // millis
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(TestReplicationStateBasic.class);
-
-  @Before
-  public void setUp() {
-    zkTimeoutCount = 0;
-  }
-
-  @Test
-  public void testReplicationQueuesClient() throws ReplicationException, 
KeeperException {
-    rqc.init();
-    // Test methods with empty state
-    assertEquals(0, rqc.getListOfReplicators().size());
-    assertNull(rqc.getLogsInQueue(server1, "qId1"));
-    assertNull(rqc.getAllQueues(server1));
-
-    /*
-     * Set up data Two replicators: -- server1: three queues with 0, 1 and 2 
log files each --
-     * server2: zero queues
-     */
-    rq1.init(server1);
-    rq2.init(server2);
-    rq1.addLog("qId1", "trash");
-    rq1.removeLog("qId1", "trash");
-    rq1.addLog("qId2", "filename1");
-    rq1.addLog("qId3", "filename2");
-    rq1.addLog("qId3", "filename3");
-    rq2.addLog("trash", "trash");
-    rq2.removeQueue("trash");
-
-    List<String> reps = rqc.getListOfReplicators();
-    assertEquals(2, reps.size());
-    assertTrue(server1, reps.contains(server1));
-    assertTrue(server2, reps.contains(server2));
-
-    assertNull(rqc.getLogsInQueue("bogus", "bogus"));
-    assertNull(rqc.getLogsInQueue(server1, "bogus"));
-    assertEquals(0, rqc.getLogsInQueue(server1, "qId1").size());
-    assertEquals(1, rqc.getLogsInQueue(server1, "qId2").size());
-    assertEquals("filename1", rqc.getLogsInQueue(server1, "qId2").get(0));
-
-    assertNull(rqc.getAllQueues("bogus"));
-    assertEquals(0, rqc.getAllQueues(server2).size());
-    List<String> list = rqc.getAllQueues(server1);
-    assertEquals(3, list.size());
-    assertTrue(list.contains("qId2"));
-    assertTrue(list.contains("qId3"));
-  }
-
-  @Test
-  public void testReplicationQueues() throws ReplicationException {
-    rq1.init(server1);
-    rq2.init(server2);
-    rq3.init(server3);
-    //Initialize ReplicationPeer so we can add peers (we don't transfer lone 
queues)
-    rp.init();
-
-    // 3 replicators should exist
-    assertEquals(3, rq1.getListOfReplicators().size());
-    rq1.removeQueue("bogus");
-    rq1.removeLog("bogus", "bogus");
-    rq1.removeAllQueues();
-    assertEquals(0, rq1.getAllQueues().size());
-    assertEquals(0, rq1.getLogPosition("bogus", "bogus"));
-    assertNull(rq1.getLogsInQueue("bogus"));
-    assertNull(rq1.getUnClaimedQueueIds(
-        ServerName.valueOf("bogus", 1234, -1L).toString()));
-
-    rq1.setLogPosition("bogus", "bogus", 5L);
-
-    populateQueues();
-
-    assertEquals(3, rq1.getListOfReplicators().size());
-    assertEquals(0, rq2.getLogsInQueue("qId1").size());
-    assertEquals(5, rq3.getLogsInQueue("qId5").size());
-    assertEquals(0, rq3.getLogPosition("qId1", "filename0"));
-    rq3.setLogPosition("qId5", "filename4", 354L);
-    assertEquals(354L, rq3.getLogPosition("qId5", "filename4"));
-
-    assertEquals(5, rq3.getLogsInQueue("qId5").size());
-    assertEquals(0, rq2.getLogsInQueue("qId1").size());
-    assertEquals(0, rq1.getAllQueues().size());
-    assertEquals(1, rq2.getAllQueues().size());
-    assertEquals(5, rq3.getAllQueues().size());
-
-    assertEquals(0, rq3.getUnClaimedQueueIds(server1).size());
-    rq3.removeReplicatorIfQueueIsEmpty(server1);
-    assertEquals(2, rq3.getListOfReplicators().size());
-
-    List<String> queues = rq2.getUnClaimedQueueIds(server3);
-    assertEquals(5, queues.size());
-    for(String queue: queues) {
-      rq2.claimQueue(server3, queue);
-    }
-    rq2.removeReplicatorIfQueueIsEmpty(server3);
-    assertEquals(1, rq2.getListOfReplicators().size());
-
-    // Try to claim our own queues
-    assertNull(rq2.getUnClaimedQueueIds(server2));
-    rq2.removeReplicatorIfQueueIsEmpty(server2);
-
-    assertEquals(6, rq2.getAllQueues().size());
-
-    rq2.removeAllQueues();
-
-    assertEquals(0, rq2.getListOfReplicators().size());
-  }
-
-  @Test
-  public void testInvalidClusterKeys() throws ReplicationException, 
KeeperException {
-    rp.init();
-
-    try {
-      rp.registerPeer(ID_ONE,
-        new 
ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:hbase"));
-      fail("Should throw an IllegalArgumentException because "
-            + "zookeeper.znode.parent is missing leading '/'.");
-    } catch (IllegalArgumentException e) {
-      // Expected.
-    }
-
-    try {
-      rp.registerPeer(ID_ONE,
-        new 
ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:/"));
-      fail("Should throw an IllegalArgumentException because 
zookeeper.znode.parent is missing.");
-    } catch (IllegalArgumentException e) {
-      // Expected.
-    }
-
-    try {
-      rp.registerPeer(ID_ONE,
-        new 
ReplicationPeerConfig().setClusterKey("hostname1.example.org::/hbase"));
-      fail("Should throw an IllegalArgumentException because "
-          + "hbase.zookeeper.property.clientPort is missing.");
-    } catch (IllegalArgumentException e) {
-      // Expected.
-    }
-  }
-
-  @Test
-  public void testHfileRefsReplicationQueues() throws ReplicationException, 
KeeperException {
-    rp.init();
-    rq1.init(server1);
-    rqc.init();
-
-    List<Pair<Path, Path>> files1 = new ArrayList<>(3);
-    files1.add(new Pair<>(null, new Path("file_1")));
-    files1.add(new Pair<>(null, new Path("file_2")));
-    files1.add(new Pair<>(null, new Path("file_3")));
-    assertNull(rqc.getReplicableHFiles(ID_ONE));
-    assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
-    rp.registerPeer(ID_ONE, new 
ReplicationPeerConfig().setClusterKey(KEY_ONE));
-    rq1.addPeerToHFileRefs(ID_ONE);
-    rq1.addHFileRefs(ID_ONE, files1);
-    assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
-    assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size());
-    List<String> hfiles2 = new ArrayList<>(files1.size());
-    for (Pair<Path, Path> p : files1) {
-      hfiles2.add(p.getSecond().getName());
-    }
-    String removedString = hfiles2.remove(0);
-    rq1.removeHFileRefs(ID_ONE, hfiles2);
-    assertEquals(1, rqc.getReplicableHFiles(ID_ONE).size());
-    hfiles2 = new ArrayList<>(1);
-    hfiles2.add(removedString);
-    rq1.removeHFileRefs(ID_ONE, hfiles2);
-    assertEquals(0, rqc.getReplicableHFiles(ID_ONE).size());
-    rp.unregisterPeer(ID_ONE);
-  }
-
-  @Test
-  public void testRemovePeerForHFileRefs() throws ReplicationException, 
KeeperException {
-    rq1.init(server1);
-    rqc.init();
-
-    rp.init();
-    rp.registerPeer(ID_ONE, new 
ReplicationPeerConfig().setClusterKey(KEY_ONE));
-    rq1.addPeerToHFileRefs(ID_ONE);
-    rp.registerPeer(ID_TWO, new 
ReplicationPeerConfig().setClusterKey(KEY_TWO));
-    rq1.addPeerToHFileRefs(ID_TWO);
-
-    List<Pair<Path, Path>> files1 = new ArrayList<>(3);
-    files1.add(new Pair<>(null, new Path("file_1")));
-    files1.add(new Pair<>(null, new Path("file_2")));
-    files1.add(new Pair<>(null, new Path("file_3")));
-    rq1.addHFileRefs(ID_ONE, files1);
-    rq1.addHFileRefs(ID_TWO, files1);
-    assertEquals(2, rqc.getAllPeersFromHFileRefsQueue().size());
-    assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size());
-    assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size());
-
-    rp.unregisterPeer(ID_ONE);
-    rq1.removePeerFromHFileRefs(ID_ONE);
-    assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
-    assertNull(rqc.getReplicableHFiles(ID_ONE));
-    assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size());
-
-    rp.unregisterPeer(ID_TWO);
-    rq1.removePeerFromHFileRefs(ID_TWO);
-    assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
-    assertNull(rqc.getReplicableHFiles(ID_TWO));
-  }
-
-  @Test
-  public void testReplicationPeers() throws Exception {
-    rp.init();
-
-    // Test methods with non-existent peer ids
-    try {
-      rp.unregisterPeer("bogus");
-      fail("Should have thrown an IllegalArgumentException when passed a bogus 
peerId");
-    } catch (IllegalArgumentException e) {
-    }
-    try {
-      rp.enablePeer("bogus");
-      fail("Should have thrown an IllegalArgumentException when passed a bogus 
peerId");
-    } catch (IllegalArgumentException e) {
-    }
-    try {
-      rp.disablePeer("bogus");
-      fail("Should have thrown an IllegalArgumentException when passed a bogus 
peerId");
-    } catch (IllegalArgumentException e) {
-    }
-    try {
-      rp.getStatusOfPeer("bogus");
-      fail("Should have thrown an IllegalArgumentException when passed a bogus 
peerId");
-    } catch (IllegalArgumentException e) {
-    }
-    assertFalse(rp.peerConnected("bogus"));
-    rp.peerDisconnected("bogus");
-
-    assertNull(rp.getPeerConf("bogus"));
-    assertNumberOfPeers(0);
-
-    // Add some peers
-    rp.registerPeer(ID_ONE, new 
ReplicationPeerConfig().setClusterKey(KEY_ONE));
-    assertNumberOfPeers(1);
-    rp.registerPeer(ID_TWO, new 
ReplicationPeerConfig().setClusterKey(KEY_TWO));
-    assertNumberOfPeers(2);
-
-    // Test methods with a peer that is added but not connected
-    try {
-      rp.getStatusOfPeer(ID_ONE);
-      fail("There are no connected peers, should have thrown an 
IllegalArgumentException");
-    } catch (IllegalArgumentException e) {
-    }
-    assertEquals(KEY_ONE, 
ZKConfig.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond()));
-    rp.unregisterPeer(ID_ONE);
-    rp.peerDisconnected(ID_ONE);
-    assertNumberOfPeers(1);
-
-    // Add one peer
-    rp.registerPeer(ID_ONE, new 
ReplicationPeerConfig().setClusterKey(KEY_ONE));
-    rp.peerConnected(ID_ONE);
-    assertNumberOfPeers(2);
-    assertTrue(rp.getStatusOfPeer(ID_ONE));
-    rp.disablePeer(ID_ONE);
-    assertConnectedPeerStatus(false, ID_ONE);
-    rp.enablePeer(ID_ONE);
-    assertConnectedPeerStatus(true, ID_ONE);
-
-    // Disconnect peer
-    rp.peerDisconnected(ID_ONE);
-    assertNumberOfPeers(2);
-    try {
-      rp.getStatusOfPeer(ID_ONE);
-      fail("There are no connected peers, should have thrown an 
IllegalArgumentException");
-    } catch (IllegalArgumentException e) {
-    }
-  }
-
-  protected void assertConnectedPeerStatus(boolean status, String peerId) 
throws Exception {
-    // we can first check if the value was changed in the store, if it wasn't 
then fail right away
-    if (status != rp.getStatusOfPeerFromBackingStore(peerId)) {
-      fail("ConnectedPeerStatus was " + !status + " but expected " + status + 
" in ZK");
-    }
-    while (true) {
-      if (status == rp.getStatusOfPeer(peerId)) {
-        return;
-      }
-      if (zkTimeoutCount < ZK_MAX_COUNT) {
-        LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + 
status
-            + ", sleeping and trying again.");
-        Thread.sleep(ZK_SLEEP_INTERVAL);
-      } else {
-        fail("Timed out waiting for ConnectedPeerStatus to be " + status);
-      }
-    }
-  }
-
-  protected void assertNumberOfPeers(int total) {
-    assertEquals(total, rp.getAllPeerConfigs().size());
-    assertEquals(total, rp.getAllPeerIds().size());
-    assertEquals(total, rp.getAllPeerIds().size());
-  }
-
-  /*
-   * three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 
has 5 queues with 1, 2,
-   * 3, 4, 5 log files respectively
-   */
-  protected void populateQueues() throws ReplicationException {
-    rq1.addLog("trash", "trash");
-    rq1.removeQueue("trash");
-
-    rq2.addLog("qId1", "trash");
-    rq2.removeLog("qId1", "trash");
-
-    for (int i = 1; i < 6; i++) {
-      for (int j = 0; j < i; j++) {
-        rq3.addLog("qId" + i, "filename" + j);
-      }
-      //Add peers for the corresponding queues so they are not orphans
-      rp.registerPeer("qId" + i, new 
ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus" + i));
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/hbase/blob/15055a42/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
deleted file mode 100644
index d51d3c3..0000000
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hbase.ChoreService;
-import org.apache.hadoop.hbase.ClusterId;
-import org.apache.hadoop.hbase.CoordinatedStateManager;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
-import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
-import org.apache.hadoop.hbase.zookeeper.ZKConfig;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
-import org.apache.zookeeper.KeeperException;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Category({ReplicationTests.class, MediumTests.class})
-public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestReplicationStateZKImpl.class);
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(TestReplicationStateZKImpl.class);
-
-  private static Configuration conf;
-  private static HBaseTestingUtility utility;
-  private static ZKWatcher zkw;
-  private static String replicationZNode;
-  private ReplicationQueuesZKImpl rqZK;
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    utility = new HBaseTestingUtility();
-    utility.startMiniZKCluster();
-    conf = utility.getConfiguration();
-    conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
-    zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
-    String replicationZNodeName = conf.get("zookeeper.znode.replication", 
"replication");
-    replicationZNode = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, 
replicationZNodeName);
-    KEY_ONE = initPeerClusterState("/hbase1");
-    KEY_TWO = initPeerClusterState("/hbase2");
-  }
-
-  private static String initPeerClusterState(String baseZKNode)
-      throws IOException, KeeperException {
-    // Add a dummy region server and set up the cluster id
-    Configuration testConf = new Configuration(conf);
-    testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode);
-    ZKWatcher zkw1 = new ZKWatcher(testConf, "test1", null);
-    String fakeRs = ZNodePaths.joinZNode(zkw1.znodePaths.rsZNode, 
"hostname1.example.org:1234");
-    ZKUtil.createWithParents(zkw1, fakeRs);
-    ZKClusterId.setClusterId(zkw1, new ClusterId());
-    return ZKConfig.getZooKeeperClusterKey(testConf);
-  }
-
-  @Before
-  @Override
-  public void setUp() {
-    super.setUp();
-    DummyServer ds1 = new DummyServer(server1);
-    DummyServer ds2 = new DummyServer(server2);
-    DummyServer ds3 = new DummyServer(server3);
-    try {
-      rq1 = ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(conf, ds1, zkw));
-      rq2 = ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(conf, ds2, zkw));
-      rq3 = ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(conf, ds3, zkw));
-      rqc = ReplicationFactory.getReplicationQueuesClient(
-        new ReplicationQueuesClientArguments(conf, ds1, zkw));
-    } catch (Exception e) {
-      // This should not occur, because getReplicationQueues() only throws for
-      // TableBasedReplicationQueuesImpl
-      fail("ReplicationFactory.getReplicationQueues() threw an IO Exception");
-    }
-    rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
-    OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);
-    rqZK = new ReplicationQueuesZKImpl(zkw, conf, ds1);
-  }
-
-  @After
-  public void tearDown() throws KeeperException, IOException {
-    ZKUtil.deleteNodeRecursively(zkw, replicationZNode);
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    utility.shutdownMiniZKCluster();
-  }
-
-  @Test
-  public void testIsPeerPath_PathToParentOfPeerNode() {
-    assertFalse(rqZK.isPeerPath(rqZK.peersZNode));
-  }
-
-  @Test
-  public void testIsPeerPath_PathToChildOfPeerNode() {
-    String peerChild = 
ZNodePaths.joinZNode(ZNodePaths.joinZNode(rqZK.peersZNode, "1"), "child");
-    assertFalse(rqZK.isPeerPath(peerChild));
-  }
-
-  @Test
-  public void testIsPeerPath_ActualPeerPath() {
-    String peerPath = ZNodePaths.joinZNode(rqZK.peersZNode, "1");
-    assertTrue(rqZK.isPeerPath(peerPath));
-  }
-
-  static class DummyServer implements Server {
-    private String serverName;
-    private boolean isAborted = false;
-    private boolean isStopped = false;
-
-    public DummyServer(String serverName) {
-      this.serverName = serverName;
-    }
-
-    @Override
-    public Configuration getConfiguration() {
-      return conf;
-    }
-
-    @Override
-    public ZKWatcher getZooKeeper() {
-      return zkw;
-    }
-
-    @Override
-    public CoordinatedStateManager getCoordinatedStateManager() {
-      return null;
-    }
-
-    @Override
-    public ClusterConnection getConnection() {
-      return null;
-    }
-
-    @Override
-    public MetaTableLocator getMetaTableLocator() {
-      return null;
-    }
-
-    @Override
-    public ServerName getServerName() {
-      return ServerName.valueOf(this.serverName);
-    }
-
-    @Override
-    public void abort(String why, Throwable e) {
-      LOG.info("Aborting " + serverName);
-      this.isAborted = true;
-    }
-
-    @Override
-    public boolean isAborted() {
-      return this.isAborted;
-    }
-
-    @Override
-    public void stop(String why) {
-      this.isStopped = true;
-    }
-
-    @Override
-    public boolean isStopped() {
-      return this.isStopped;
-    }
-
-    @Override
-    public ChoreService getChoreService() {
-      return null;
-    }
-
-    @Override
-    public ClusterConnection getClusterConnection() {
-      // TODO Auto-generated method stub
-      return null;
-    }
-
-    @Override
-    public FileSystem getFileSystem() {
-      return null;
-    }
-
-    @Override
-    public boolean isStopping() {
-      return false;
-    }
-
-    @Override
-    public Connection createConnection(Configuration conf) throws IOException {
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/15055a42/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
index 64db7a8..4d81d1d 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
-
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.util.List;
@@ -31,8 +29,6 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@@ -119,41 +115,4 @@ public class TestReplicationSourceManagerZkImpl extends 
TestReplicationSourceMan
 
     server.stop("");
   }
-
-  @Test
-  public void testFailoverDeadServerCversionChange() throws Exception {
-    final Server s0 = new DummyServer("cversion-change0.example.org");
-    ReplicationQueues repQueues =
-      ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(conf, s0,
-        s0.getZooKeeper()));
-    repQueues.init(s0.getServerName().toString());
-    // populate some znodes in the peer znode
-    files.add("log1");
-    files.add("log2");
-    for (String file : files) {
-      repQueues.addLog("1", file);
-    }
-    // simulate queue transfer
-    Server s1 = new DummyServer("cversion-change1.example.org");
-    ReplicationQueues rq1 =
-      ReplicationFactory.getReplicationQueues(new 
ReplicationQueuesArguments(s1.getConfiguration(), s1,
-        s1.getZooKeeper()));
-    rq1.init(s1.getServerName().toString());
-
-    ReplicationQueuesClientZKImpl client =
-      
(ReplicationQueuesClientZKImpl)ReplicationFactory.getReplicationQueuesClient(
-        new ReplicationQueuesClientArguments(s1.getConfiguration(), s1, 
s1.getZooKeeper()));
-
-    int v0 = client.getQueuesZNodeCversion();
-    List<String> queues = 
rq1.getUnClaimedQueueIds(s0.getServerName().getServerName());
-    for(String queue : queues) {
-      rq1.claimQueue(s0.getServerName().getServerName(), queue);
-    }
-    rq1.removeReplicatorIfQueueIsEmpty(s0.getServerName().getServerName());
-    int v1 = client.getQueuesZNodeCversion();
-    // cversion should increase by 1 since a child node is deleted
-    assertEquals(v0 + 1, v1);
-
-    s0.stop("");
-  }
 }

Reply via email to