HBASE-18282 ReplicationLogCleaner can delete WALs not yet replicated in case of a KeeperException
Signed-off-by: Andrew Purtell <apurt...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0743bda0 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0743bda0 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0743bda0 Branch: refs/heads/branch-2 Commit: 0743bda059d5edd6ca7bbb788c54970dc1212055 Parents: 9cbf936 Author: Ben Lau <ben...@oath.com> Authored: Tue Feb 13 17:13:50 2018 -0800 Committer: Andrew Purtell <apurt...@apache.org> Committed: Wed Feb 14 17:23:09 2018 -0800 ---------------------------------------------------------------------- .../hbase/replication/ReplicationQueues.java | 3 +- .../ReplicationQueuesClientZKImpl.java | 5 ++ .../replication/ReplicationQueuesZKImpl.java | 12 +++- .../replication/ReplicationStateZKBase.java | 8 ++- .../master/ReplicationLogCleaner.java | 12 +++- .../regionserver/ReplicationSourceManager.java | 17 ++++- .../hbase/master/cleaner/TestLogsCleaner.java | 70 ++++++++++++++++---- 7 files changed, 107 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/0743bda0/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java index 7f440b1..a2d21f7 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java @@ -119,8 +119,9 @@ public interface ReplicationQueues { * Get a list of all region servers that have outstanding replication queues. These servers could * be alive, dead or from a previous run of the cluster. * @return a list of server names + * @throws ReplicationException */ - List<String> getListOfReplicators(); + List<String> getListOfReplicators() throws ReplicationException; /** * Checks if the provided znode is the same as this region server's http://git-wip-us.apache.org/repos/asf/hbase/blob/0743bda0/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java index e00a7a2..0eeba19 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java @@ -59,6 +59,11 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem throw new ReplicationException("Internal error while initializing a queues client", e); } } + + @Override + public List<String> getListOfReplicators() throws KeeperException { + return super.getListOfReplicatorsZK(); + } @Override public List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException { http://git-wip-us.apache.org/repos/asf/hbase/blob/0743bda0/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java index 7551cb7..40bdeb8 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java @@ -100,7 +100,17 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R } } } - + + @Override + public List<String> getListOfReplicators() throws ReplicationException { + try { + return super.getListOfReplicatorsZK(); + } catch (KeeperException e) { + LOG.warn("getListOfReplicators() from ZK failed", e); + throw new ReplicationException("getListOfReplicators() from ZK failed", e); + } + } + @Override public void removeQueue(String queueId) { try { http://git-wip-us.apache.org/repos/asf/hbase/blob/0743bda0/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java index 05bbc84..4e9479f 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java @@ -93,12 +93,18 @@ public abstract class ReplicationStateZKBase { this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hfileRefsZNodeName); } - public List<String> getListOfReplicators() { + /** + * Subclasses that use ZK explicitly can just call this directly while classes + * that are trying to hide internal details of storage can wrap the KeeperException + * into a ReplicationException or something else. + */ + protected List<String> getListOfReplicatorsZK() throws KeeperException { List<String> result = null; try { result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.queuesZNode); } catch (KeeperException e) { this.abortable.abort("Failed to get list of replicators", e); + throw e; } return result; } http://git-wip-us.apache.org/repos/asf/hbase/blob/0743bda0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java index d2e6d68..5128d58 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java @@ -115,7 +115,15 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { LOG.error("Error while configuring " + this.getClass().getName(), e); } } - + + @VisibleForTesting + public void setConf(Configuration conf, ZKWatcher zk, + ReplicationQueuesClient replicationQueuesClient) { + super.setConf(conf); + this.zkw = zk; + this.replicationQueues = replicationQueuesClient; + } + @Override public void stop(String why) { if (this.stopped) return; @@ -131,7 +139,7 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { return this.stopped; } - private static class WarnOnlyAbortable implements Abortable { + public static class WarnOnlyAbortable implements Abortable { @Override public void abort(String why, Throwable e) { http://git-wip-us.apache.org/repos/asf/hbase/blob/0743bda0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index c0c2333..8346824 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -792,7 +792,22 @@ public class ReplicationSourceManager implements ReplicationListener { @Override public void run() { - List<String> currentReplicators = replicationQueues.getListOfReplicators(); + List<String> currentReplicators = null; + while (currentReplicators == null) { + try { + currentReplicators = replicationQueues.getListOfReplicators(); + } catch (ReplicationException e1) { + LOG.warn("Failure in getListOfReplicators(), will retry later", e1); + try { + Thread.sleep(ThreadLocalRandom.current().nextInt(10000)); + } catch (InterruptedException e2) { + LOG.warn("Interrupted while waiting for list of replicators to be available, " + + "will not adopt any abandoned queues", e2); + Thread.currentThread().interrupt(); + break; + } + } + } if (currentReplicators == null || currentReplicators.isEmpty()) { return; } http://git-wip-us.apache.org/repos/asf/hbase/blob/0743bda0/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index 9c577fe..210e5c9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.doAnswer; import java.io.IOException; import java.lang.reflect.Field; @@ -30,6 +31,8 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -51,6 +54,8 @@ import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; import org.apache.hadoop.hbase.testclassification.MasterTests; @@ -59,13 +64,14 @@ import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.Stat; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -219,12 +225,10 @@ public class TestLogsCleaner { cleaner.getDeletableFiles(new LinkedList<>()); } - /** - * ReplicationLogCleaner should be able to ride over ZooKeeper errors without aborting. - */ - @Test - public void testZooKeeperAbort() throws Exception { + @Test(timeout=10000) + public void testZooKeeperAbortDuringGetListOfReplicators() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); + ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); List<FileStatus> dummyFiles = Lists.newArrayList( @@ -232,20 +236,56 @@ public class TestLogsCleaner { new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2")) ); - try (FaultyZooKeeperWatcher faultyZK = new FaultyZooKeeperWatcher(conf, - "testZooKeeperAbort-faulty", null)) { + FaultyZooKeeperWatcher faultyZK = + new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null); + final AtomicBoolean getListOfReplicatorsFailed = new AtomicBoolean(false); + + try { faultyZK.init(); - cleaner.setConf(conf, faultyZK); - cleaner.preClean(); + ReplicationQueuesClient replicationQueuesClient = spy(ReplicationFactory.getReplicationQueuesClient( + new ReplicationQueuesClientArguments(conf, new ReplicationLogCleaner.WarnOnlyAbortable(), faultyZK))); + doAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + try { + return invocation.callRealMethod(); + } catch (KeeperException.ConnectionLossException e) { + getListOfReplicatorsFailed.set(true); + throw e; + } + } + }).when(replicationQueuesClient).getListOfReplicators(); + replicationQueuesClient.init(); + + cleaner.setConf(conf, faultyZK, replicationQueuesClient); // should keep all files due to a ConnectionLossException getting the queues znodes + cleaner.preClean(); Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles); + + assertTrue(getListOfReplicatorsFailed.get()); assertFalse(toDelete.iterator().hasNext()); assertFalse(cleaner.isStopped()); + } finally { + faultyZK.close(); } + } + + /** + * When zk is working both files should be returned + * @throws Exception + */ + @Test(timeout=10000) + public void testZooKeeperNormal() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); - // when zk is working both files should be returned - cleaner = new ReplicationLogCleaner(); - try (ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null)) { + List<FileStatus> dummyFiles = Lists.newArrayList( + new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")), + new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2")) + ); + + ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null); + try { cleaner.setConf(conf, zkw); cleaner.preClean(); Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles); @@ -255,6 +295,8 @@ public class TestLogsCleaner { assertTrue(iter.hasNext()); assertEquals(new Path("log2"), iter.next().getPath()); assertFalse(iter.hasNext()); + } finally { + zkw.close(); } } @@ -395,7 +437,7 @@ public class TestLogsCleaner { public void init() throws Exception { this.zk = spy(super.getRecoverableZooKeeper()); doThrow(new KeeperException.ConnectionLossException()) - .when(zk).getData("/hbase/replication/rs", null, new Stat()); + .when(zk).getChildren("/hbase/replication/rs", null); } @Override