HBASE-18282 ReplicationLogCleaner can delete WALs not yet replicated in case of 
a KeeperException

Signed-off-by: Andrew Purtell <apurt...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0743bda0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0743bda0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0743bda0

Branch: refs/heads/branch-2
Commit: 0743bda059d5edd6ca7bbb788c54970dc1212055
Parents: 9cbf936
Author: Ben Lau <ben...@oath.com>
Authored: Tue Feb 13 17:13:50 2018 -0800
Committer: Andrew Purtell <apurt...@apache.org>
Committed: Wed Feb 14 17:23:09 2018 -0800

----------------------------------------------------------------------
 .../hbase/replication/ReplicationQueues.java    |  3 +-
 .../ReplicationQueuesClientZKImpl.java          |  5 ++
 .../replication/ReplicationQueuesZKImpl.java    | 12 +++-
 .../replication/ReplicationStateZKBase.java     |  8 ++-
 .../master/ReplicationLogCleaner.java           | 12 +++-
 .../regionserver/ReplicationSourceManager.java  | 17 ++++-
 .../hbase/master/cleaner/TestLogsCleaner.java   | 70 ++++++++++++++++----
 7 files changed, 107 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/0743bda0/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
index 7f440b1..a2d21f7 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
@@ -119,8 +119,9 @@ public interface ReplicationQueues {
    * Get a list of all region servers that have outstanding replication 
queues. These servers could
    * be alive, dead or from a previous run of the cluster.
    * @return a list of server names
+   * @throws ReplicationException
    */
-  List<String> getListOfReplicators();
+  List<String> getListOfReplicators() throws ReplicationException;
 
   /**
    * Checks if the provided znode is the same as this region server's

http://git-wip-us.apache.org/repos/asf/hbase/blob/0743bda0/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
index e00a7a2..0eeba19 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
@@ -59,6 +59,11 @@ public class ReplicationQueuesClientZKImpl extends 
ReplicationStateZKBase implem
       throw new ReplicationException("Internal error while initializing a 
queues client", e);
     }
   }
+  
+  @Override
+  public List<String> getListOfReplicators() throws KeeperException {
+    return super.getListOfReplicatorsZK();
+  }
 
   @Override
   public List<String> getLogsInQueue(String serverName, String queueId) throws 
KeeperException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/0743bda0/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index 7551cb7..40bdeb8 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -100,7 +100,17 @@ public class ReplicationQueuesZKImpl extends 
ReplicationStateZKBase implements R
       }
     }
   }
-
+  
+  @Override
+  public List<String> getListOfReplicators() throws ReplicationException {
+    try {
+      return super.getListOfReplicatorsZK();
+    } catch (KeeperException e) {
+      LOG.warn("getListOfReplicators() from ZK failed", e);
+      throw new ReplicationException("getListOfReplicators() from ZK failed", 
e);
+    }
+  }
+  
   @Override
   public void removeQueue(String queueId) {
     try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/0743bda0/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
index 05bbc84..4e9479f 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
@@ -93,12 +93,18 @@ public abstract class ReplicationStateZKBase {
     this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, 
hfileRefsZNodeName);
   }
 
-  public List<String> getListOfReplicators() {
+  /**
+   * Subclasses that use ZK explicitly can just call this directly while 
classes
+   * that are trying to hide internal details of storage can wrap the 
KeeperException
+   * into a ReplicationException or something else.
+   */
+  protected List<String> getListOfReplicatorsZK() throws KeeperException {
     List<String> result = null;
     try {
       result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.queuesZNode);
     } catch (KeeperException e) {
       this.abortable.abort("Failed to get list of replicators", e);
+      throw e;
     }
     return result;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0743bda0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
index d2e6d68..5128d58 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
@@ -115,7 +115,15 @@ public class ReplicationLogCleaner extends 
BaseLogCleanerDelegate {
       LOG.error("Error while configuring " + this.getClass().getName(), e);
     }
   }
-
+  
+  @VisibleForTesting
+  public void setConf(Configuration conf, ZKWatcher zk, 
+      ReplicationQueuesClient replicationQueuesClient) {
+    super.setConf(conf);
+    this.zkw = zk;
+    this.replicationQueues = replicationQueuesClient;
+  }
+  
   @Override
   public void stop(String why) {
     if (this.stopped) return;
@@ -131,7 +139,7 @@ public class ReplicationLogCleaner extends 
BaseLogCleanerDelegate {
     return this.stopped;
   }
 
-  private static class WarnOnlyAbortable implements Abortable {
+  public static class WarnOnlyAbortable implements Abortable {
 
     @Override
     public void abort(String why, Throwable e) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/0743bda0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index c0c2333..8346824 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -792,7 +792,22 @@ public class ReplicationSourceManager implements 
ReplicationListener {
 
     @Override
     public void run() {
-      List<String> currentReplicators = 
replicationQueues.getListOfReplicators();
+      List<String> currentReplicators = null;
+      while (currentReplicators == null) {
+        try {
+          currentReplicators = replicationQueues.getListOfReplicators();
+        } catch (ReplicationException e1) {
+          LOG.warn("Failure in getListOfReplicators(), will retry later", e1);
+          try {
+            Thread.sleep(ThreadLocalRandom.current().nextInt(10000));
+          } catch (InterruptedException e2) {
+            LOG.warn("Interrupted while waiting for list of replicators to be 
available, "
+                + "will not adopt any abandoned queues", e2);
+            Thread.currentThread().interrupt();
+            break;
+          }
+        }
+      }
       if (currentReplicators == null || currentReplicators.isEmpty()) {
         return;
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0743bda0/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
index 9c577fe..210e5c9 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.doAnswer;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -30,6 +31,8 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -51,6 +54,8 @@ import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
 import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -59,13 +64,14 @@ import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -219,12 +225,10 @@ public class TestLogsCleaner {
     cleaner.getDeletableFiles(new LinkedList<>());
   }
 
-  /**
-   * ReplicationLogCleaner should be able to ride over ZooKeeper errors 
without aborting.
-   */
-  @Test
-  public void testZooKeeperAbort() throws Exception {
+  @Test(timeout=10000)
+  public void testZooKeeperAbortDuringGetListOfReplicators() throws Exception {
     Configuration conf = TEST_UTIL.getConfiguration();
+
     ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
 
     List<FileStatus> dummyFiles = Lists.newArrayList(
@@ -232,20 +236,56 @@ public class TestLogsCleaner {
         new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new 
Path("log2"))
     );
 
-    try (FaultyZooKeeperWatcher faultyZK = new FaultyZooKeeperWatcher(conf,
-        "testZooKeeperAbort-faulty", null)) {
+    FaultyZooKeeperWatcher faultyZK =
+        new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null);
+    final AtomicBoolean getListOfReplicatorsFailed = new AtomicBoolean(false);
+
+    try {
       faultyZK.init();
-      cleaner.setConf(conf, faultyZK);
-      cleaner.preClean();
+      ReplicationQueuesClient replicationQueuesClient = 
spy(ReplicationFactory.getReplicationQueuesClient(
+        new ReplicationQueuesClientArguments(conf, new 
ReplicationLogCleaner.WarnOnlyAbortable(), faultyZK)));
+      doAnswer(new Answer<Object>() {
+        @Override
+        public Object answer(InvocationOnMock invocation) throws Throwable {
+          try {
+            return invocation.callRealMethod();
+          } catch (KeeperException.ConnectionLossException e) {
+            getListOfReplicatorsFailed.set(true);
+            throw e;
+          }
+        }
+      }).when(replicationQueuesClient).getListOfReplicators();
+      replicationQueuesClient.init();
+
+      cleaner.setConf(conf, faultyZK, replicationQueuesClient);
       // should keep all files due to a ConnectionLossException getting the 
queues znodes
+      cleaner.preClean();
       Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles);
+
+      assertTrue(getListOfReplicatorsFailed.get());
       assertFalse(toDelete.iterator().hasNext());
       assertFalse(cleaner.isStopped());
+    } finally {
+      faultyZK.close();
     }
+  }
+
+  /**
+   * When zk is working both files should be returned
+   * @throws Exception
+   */
+  @Test(timeout=10000)
+  public void testZooKeeperNormal() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
 
-    // when zk is working both files should be returned
-    cleaner = new ReplicationLogCleaner();
-    try (ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", 
null)) {
+    List<FileStatus> dummyFiles = Lists.newArrayList(
+        new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new 
Path("log1")),
+        new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new 
Path("log2"))
+    );
+    
+    ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null);
+    try {
       cleaner.setConf(conf, zkw);
       cleaner.preClean();
       Iterable<FileStatus> filesToDelete = 
cleaner.getDeletableFiles(dummyFiles);
@@ -255,6 +295,8 @@ public class TestLogsCleaner {
       assertTrue(iter.hasNext());
       assertEquals(new Path("log2"), iter.next().getPath());
       assertFalse(iter.hasNext());
+    } finally {
+      zkw.close();
     }
   }
 
@@ -395,7 +437,7 @@ public class TestLogsCleaner {
     public void init() throws Exception {
       this.zk = spy(super.getRecoverableZooKeeper());
       doThrow(new KeeperException.ConnectionLossException())
-          .when(zk).getData("/hbase/replication/rs", null, new Stat());
+        .when(zk).getChildren("/hbase/replication/rs", null);
     }
 
     @Override

Reply via email to