HBASE-19599 Remove ReplicationQueuesClient, use ReplicationQueueStorage directly
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/15055a42 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/15055a42 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/15055a42 Branch: refs/heads/branch-2 Commit: 15055a421bbc789684af5bffadab04ed95b09568 Parents: 1de08de Author: zhangduo <zhang...@apache.org> Authored: Mon Dec 25 18:49:56 2017 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Fri Mar 9 20:55:48 2018 +0800 ---------------------------------------------------------------------- .../hbase/replication/ReplicationFactory.java | 19 +- .../replication/ReplicationPeersZKImpl.java | 21 +- .../replication/ReplicationQueueStorage.java | 26 +- .../replication/ReplicationQueuesClient.java | 93 ----- .../ReplicationQueuesClientArguments.java | 40 -- .../ReplicationQueuesClientZKImpl.java | 181 --------- .../replication/ZKReplicationQueueStorage.java | 90 ++++- .../replication/TestReplicationStateBasic.java | 378 +++++++++++++++++++ .../replication/TestReplicationStateZKImpl.java | 153 ++++++++ .../TestZKReplicationQueueStorage.java | 74 ++++ .../cleaner/ReplicationZKNodeCleaner.java | 71 ++-- .../cleaner/ReplicationZKNodeCleanerChore.java | 5 +- .../replication/ReplicationPeerManager.java | 31 +- .../master/ReplicationHFileCleaner.java | 109 ++---- .../master/ReplicationLogCleaner.java | 44 +-- .../regionserver/DumpReplicationQueues.java | 78 ++-- .../hbase/util/hbck/ReplicationChecker.java | 14 +- .../client/TestAsyncReplicationAdminApi.java | 31 +- .../replication/TestReplicationAdmin.java | 2 + .../hbase/master/cleaner/TestLogsCleaner.java | 81 ++-- .../cleaner/TestReplicationHFileCleaner.java | 29 -- .../cleaner/TestReplicationZKNodeCleaner.java | 12 +- .../replication/TestReplicationStateBasic.java | 378 ------------------- .../replication/TestReplicationStateZKImpl.java | 232 ------------ .../TestReplicationSourceManagerZkImpl.java | 41 -- 25 files changed, 890 insertions(+), 1343 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/15055a42/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java index 9f4ad18..6c1c213 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -37,20 +36,14 @@ public class ReplicationFactory { args); } - public static ReplicationQueuesClient - getReplicationQueuesClient(ReplicationQueuesClientArguments args) throws Exception { - return (ReplicationQueuesClient) ConstructorUtils - .invokeConstructor(ReplicationQueuesClientZKImpl.class, args); - } - - public static ReplicationPeers getReplicationPeers(final ZKWatcher zk, Configuration conf, - Abortable abortable) { + public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf, + Abortable abortable) { return getReplicationPeers(zk, conf, null, abortable); } - public static ReplicationPeers getReplicationPeers(final ZKWatcher zk, Configuration conf, - final ReplicationQueuesClient queuesClient, Abortable abortable) { - return new ReplicationPeersZKImpl(zk, conf, queuesClient, abortable); + public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf, + ReplicationQueueStorage queueStorage, Abortable abortable) { + return new ReplicationPeersZKImpl(zk, conf, queueStorage, abortable); } public static ReplicationTracker getReplicationTracker(ZKWatcher zookeeper, http://git-wip-us.apache.org/repos/asf/hbase/blob/15055a42/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index 419e289..4e5f757 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.exceptions.DeserializationException; @@ -80,17 +81,17 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re // Map of peer clusters keyed by their id private Map<String, ReplicationPeerZKImpl> peerClusters; - private final ReplicationQueuesClient queuesClient; + private final ReplicationQueueStorage queueStorage; private Abortable abortable; private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeersZKImpl.class); - public ReplicationPeersZKImpl(final ZKWatcher zk, final Configuration conf, - final ReplicationQueuesClient queuesClient, Abortable abortable) { + public ReplicationPeersZKImpl(ZKWatcher zk, Configuration conf, + ReplicationQueueStorage queueStorage, Abortable abortable) { super(zk, conf, abortable); this.abortable = abortable; this.peerClusters = new ConcurrentHashMap<>(); - this.queuesClient = queuesClient; + this.queueStorage = queueStorage; } @Override @@ -510,14 +511,16 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } private void checkQueuesDeleted(String peerId) throws ReplicationException { - if (queuesClient == null) return; + if (queueStorage == null) { + return; + } try { - List<String> replicators = queuesClient.getListOfReplicators(); + List<ServerName> replicators = queueStorage.getListOfReplicators(); if (replicators == null || replicators.isEmpty()) { return; } - for (String replicator : replicators) { - List<String> queueIds = queuesClient.getAllQueues(replicator); + for (ServerName replicator : replicators) { + List<String> queueIds = queueStorage.getAllQueues(replicator); for (String queueId : queueIds) { ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); if (queueInfo.getPeerId().equals(peerId)) { @@ -528,7 +531,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } // Check for hfile-refs queue if (-1 != ZKUtil.checkExists(zookeeper, hfileRefsZNode) - && queuesClient.getAllPeersFromHFileRefsQueue().contains(peerId)) { + && queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) { throw new IllegalArgumentException("Undeleted queue for peerId: " + peerId + ", found in hfile-refs node path " + hfileRefsZNode); } http://git-wip-us.apache.org/repos/asf/hbase/blob/15055a42/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java index 7210d9a..e774148 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java @@ -78,6 +78,14 @@ public interface ReplicationQueueStorage { throws ReplicationException; /** + * Get a list of all WALs in the given queue on the given region server. + * @param serverName the server name of the region server that owns the queue + * @param queueId a String that identifies the queue + * @return a list of WALs + */ + List<String> getWALsInQueue(ServerName serverName, String queueId) throws ReplicationException; + + /** * Get a list of all queues for the specified region server. * @param serverName the server name of the region server that owns the set of queues * @return a list of queueIds @@ -108,8 +116,8 @@ public interface ReplicationQueueStorage { /** * Load all wals in all replication queues. This method guarantees to return a snapshot which - * contains all WALs in the zookeeper at the start of this call even there is concurrent queue - * failover. However, some newly created WALs during the call may not be included. + * contains all WALs at the start of this call even there is concurrent queue failover. However, + * some newly created WALs during the call may not be included. */ Set<String> getAllWALs() throws ReplicationException; @@ -143,13 +151,6 @@ public interface ReplicationQueueStorage { void removeHFileRefs(String peerId, List<String> files) throws ReplicationException; /** - * Get the change version number of replication hfile references node. This can be used as - * optimistic locking to get a consistent snapshot of the replication queues of hfile references. - * @return change version number of hfile references node - */ - int getHFileRefsNodeChangeVersion() throws ReplicationException; - - /** * Get list of all peers from hfile reference queue. * @return a list of peer ids */ @@ -161,4 +162,11 @@ public interface ReplicationQueueStorage { * @return a list of hfile references */ List<String> getReplicableHFiles(String peerId) throws ReplicationException; + + /** + * Load all hfile references in all replication queues. This method guarantees to return a + * snapshot which contains all hfile references at the start of this call. However, some newly + * created hfile references during the call may not be included. + */ + Set<String> getAllHFileRefs() throws ReplicationException; } http://git-wip-us.apache.org/repos/asf/hbase/blob/15055a42/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java deleted file mode 100644 index 2c513fa..0000000 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import java.util.List; -import java.util.Set; - -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.zookeeper.KeeperException; - -/** - * This provides an interface for clients of replication to view replication queues. These queues - * keep track of the sources(WALs/HFile references) that still need to be replicated to remote - * clusters. - */ -@InterfaceAudience.Private -public interface ReplicationQueuesClient { - - /** - * Initialize the replication queue client interface. - */ - public void init() throws ReplicationException; - - /** - * Get a list of all region servers that have outstanding replication queues. These servers could - * be alive, dead or from a previous run of the cluster. - * @return a list of server names - * @throws KeeperException zookeeper exception - */ - List<String> getListOfReplicators() throws KeeperException; - - /** - * Get a list of all WALs in the given queue on the given region server. - * @param serverName the server name of the region server that owns the queue - * @param queueId a String that identifies the queue - * @return a list of WALs, null if this region server is dead and has no outstanding queues - * @throws KeeperException zookeeper exception - */ - List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException; - - /** - * Get a list of all queues for the specified region server. - * @param serverName the server name of the region server that owns the set of queues - * @return a list of queueIds, null if this region server is not a replicator. - */ - List<String> getAllQueues(String serverName) throws KeeperException; - - /** - * Load all wals in all replication queues from ZK. This method guarantees to return a - * snapshot which contains all WALs in the zookeeper at the start of this call even there - * is concurrent queue failover. However, some newly created WALs during the call may - * not be included. - */ - Set<String> getAllWALs() throws KeeperException; - - /** - * Get the change version number of replication hfile references node. This can be used as - * optimistic locking to get a consistent snapshot of the replication queues of hfile references. - * @return change version number of hfile references node - */ - int getHFileRefsNodeChangeVersion() throws KeeperException; - - /** - * Get list of all peers from hfile reference queue. - * @return a list of peer ids - * @throws KeeperException zookeeper exception - */ - List<String> getAllPeersFromHFileRefsQueue() throws KeeperException; - - /** - * Get a list of all hfile references in the given peer. - * @param peerId a String that identifies the peer - * @return a list of hfile references, null if not found any - * @throws KeeperException zookeeper exception - */ - List<String> getReplicableHFiles(String peerId) throws KeeperException; -} http://git-wip-us.apache.org/repos/asf/hbase/blob/15055a42/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java deleted file mode 100644 index 9b79294..0000000 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.yetus.audience.InterfaceAudience; - -/** - * Wrapper around common arguments used to construct ReplicationQueuesClient. Used to construct - * various ReplicationQueuesClient Implementations with different constructor arguments by - * reflection. - */ -@InterfaceAudience.Private -public class ReplicationQueuesClientArguments extends ReplicationQueuesArguments { - public ReplicationQueuesClientArguments(Configuration conf, Abortable abort, - ZKWatcher zk) { - super(conf, abort, zk); - } - public ReplicationQueuesClientArguments(Configuration conf, Abortable abort) { - super(conf, abort); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/15055a42/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java deleted file mode 100644 index 0eeba19..0000000 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import java.util.List; -import java.util.Set; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; -import org.apache.hbase.thirdparty.com.google.common.collect.Sets; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.hadoop.hbase.zookeeper.ZNodePaths; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@InterfaceAudience.Private -public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implements - ReplicationQueuesClient { - - Logger LOG = LoggerFactory.getLogger(ReplicationQueuesClientZKImpl.class); - - public ReplicationQueuesClientZKImpl(ReplicationQueuesClientArguments args) { - this(args.getZk(), args.getConf(), args.getAbortable()); - } - - public ReplicationQueuesClientZKImpl(final ZKWatcher zk, Configuration conf, - Abortable abortable) { - super(zk, conf, abortable); - } - - @Override - public void init() throws ReplicationException { - try { - if (ZKUtil.checkExists(this.zookeeper, this.queuesZNode) < 0) { - ZKUtil.createWithParents(this.zookeeper, this.queuesZNode); - } - } catch (KeeperException e) { - throw new ReplicationException("Internal error while initializing a queues client", e); - } - } - - @Override - public List<String> getListOfReplicators() throws KeeperException { - return super.getListOfReplicatorsZK(); - } - - @Override - public List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException { - String znode = ZNodePaths.joinZNode(this.queuesZNode, serverName); - znode = ZNodePaths.joinZNode(znode, queueId); - List<String> result = null; - try { - result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); - } catch (KeeperException e) { - this.abortable.abort("Failed to get list of wals for queueId=" + queueId - + " and serverName=" + serverName, e); - throw e; - } - return result; - } - - @Override - public List<String> getAllQueues(String serverName) throws KeeperException { - String znode = ZNodePaths.joinZNode(this.queuesZNode, serverName); - List<String> result = null; - try { - result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); - } catch (KeeperException e) { - this.abortable.abort("Failed to get list of queues for serverName=" + serverName, e); - throw e; - } - return result; - } - - @Override - public Set<String> getAllWALs() throws KeeperException { - /** - * Load all wals in all replication queues from ZK. This method guarantees to return a - * snapshot which contains all WALs in the zookeeper at the start of this call even there - * is concurrent queue failover. However, some newly created WALs during the call may - * not be included. - */ - for (int retry = 0; ; retry++) { - int v0 = getQueuesZNodeCversion(); - List<String> rss = getListOfReplicators(); - if (rss == null || rss.isEmpty()) { - LOG.debug("Didn't find any region server that replicates, won't prevent any deletions."); - return ImmutableSet.of(); - } - Set<String> wals = Sets.newHashSet(); - for (String rs : rss) { - List<String> listOfPeers = getAllQueues(rs); - // if rs just died, this will be null - if (listOfPeers == null) { - continue; - } - for (String id : listOfPeers) { - List<String> peersWals = getLogsInQueue(rs, id); - if (peersWals != null) { - wals.addAll(peersWals); - } - } - } - int v1 = getQueuesZNodeCversion(); - if (v0 == v1) { - return wals; - } - LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d", - v0, v1, retry)); - } - } - - public int getQueuesZNodeCversion() throws KeeperException { - try { - Stat stat = new Stat(); - ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat); - return stat.getCversion(); - } catch (KeeperException e) { - this.abortable.abort("Failed to get stat of replication rs node", e); - throw e; - } - } - - @Override - public int getHFileRefsNodeChangeVersion() throws KeeperException { - Stat stat = new Stat(); - try { - ZKUtil.getDataNoWatch(this.zookeeper, this.hfileRefsZNode, stat); - } catch (KeeperException e) { - this.abortable.abort("Failed to get stat of replication hfile references node.", e); - throw e; - } - return stat.getCversion(); - } - - @Override - public List<String> getAllPeersFromHFileRefsQueue() throws KeeperException { - List<String> result = null; - try { - result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.hfileRefsZNode); - } catch (KeeperException e) { - this.abortable.abort("Failed to get list of all peers in hfile references node.", e); - throw e; - } - return result; - } - - @Override - public List<String> getReplicableHFiles(String peerId) throws KeeperException { - String znode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId); - List<String> result = null; - try { - result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); - } catch (KeeperException e) { - this.abortable.abort("Failed to get list of hfile references for peerId=" + peerId, e); - throw e; - } - return result; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/15055a42/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java index 7015d7f..0275d52 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.util.CollectionUtils.nullToEmpty; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.SortedSet; @@ -49,7 +50,7 @@ import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets; +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; /** * ZK based replication queue storage. @@ -61,7 +62,7 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationQueueStorage.class); public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY = - "zookeeper.znode.replication.hfile.refs"; + "zookeeper.znode.replication.hfile.refs"; public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs"; /** @@ -256,11 +257,23 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase } } - private List<String> getLogsInQueue0(ServerName serverName, String queueId) + private List<String> getWALsInQueue0(ServerName serverName, String queueId) throws KeeperException { return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, getQueueNode(serverName, queueId))); } + @Override + public List<String> getWALsInQueue(ServerName serverName, String queueId) + throws ReplicationException { + try { + return getWALsInQueue0(serverName, queueId); + } catch (KeeperException e) { + throw new ReplicationException( + "Failed to get wals in queue (serverName=" + serverName + ", queueId=" + queueId + ")", + e); + } + } + private List<String> getAllQueues0(ServerName serverName) throws KeeperException { return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, getRsNode(serverName))); } @@ -274,7 +287,9 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase } } - private int getQueuesZNodeCversion() throws KeeperException { + // will be overridden in UTs + @VisibleForTesting + protected int getQueuesZNodeCversion() throws KeeperException { Stat stat = new Stat(); ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat); return stat.getCversion(); @@ -290,10 +305,10 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase LOG.debug("Didn't find any region server that replicates, won't prevent any deletions."); return Collections.emptySet(); } - Set<String> wals = Sets.newHashSet(); + Set<String> wals = new HashSet<>(); for (ServerName rs : rss) { for (String queueId : getAllQueues0(rs)) { - wals.addAll(getLogsInQueue0(rs, queueId)); + wals.addAll(getWALsInQueue0(rs, queueId)); } } int v1 = getQueuesZNodeCversion(); @@ -356,9 +371,9 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase if (debugEnabled) { LOG.debug("Adding hfile references " + pairs + " in queue " + peerNode); } - List<ZKUtilOp> listOfOps = - pairs.stream().map(p -> p.getSecond().getName()).map(n -> getHFileNode(peerNode, n)) - .map(f -> ZKUtilOp.createAndFailSilent(f, HConstants.EMPTY_BYTE_ARRAY)).collect(toList()); + List<ZKUtilOp> listOfOps = pairs.stream().map(p -> p.getSecond().getName()) + .map(n -> getHFileNode(peerNode, n)) + .map(f -> ZKUtilOp.createAndFailSilent(f, HConstants.EMPTY_BYTE_ARRAY)).collect(toList()); if (debugEnabled) { LOG.debug("The multi list size for adding hfile references in zk for node " + peerNode + " is " + listOfOps.size()); @@ -391,35 +406,70 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase } } - @Override - public int getHFileRefsNodeChangeVersion() throws ReplicationException { - Stat stat = new Stat(); - try { - ZKUtil.getDataNoWatch(zookeeper, hfileRefsZNode, stat); - } catch (KeeperException e) { - throw new ReplicationException("Failed to get stat of replication hfile references node.", e); - } - return stat.getCversion(); + private List<String> getAllPeersFromHFileRefsQueue0() throws KeeperException { + return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, hfileRefsZNode)); } @Override public List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException { try { - return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, hfileRefsZNode)); + return getAllPeersFromHFileRefsQueue0(); } catch (KeeperException e) { throw new ReplicationException("Failed to get list of all peers in hfile references node.", e); } } + private List<String> getReplicableHFiles0(String peerId) throws KeeperException { + return nullToEmpty(ZKUtil.listChildrenNoWatch(this.zookeeper, getHFileRefsPeerNode(peerId))); + } + @Override public List<String> getReplicableHFiles(String peerId) throws ReplicationException { try { - return nullToEmpty(ZKUtil.listChildrenNoWatch(this.zookeeper, getHFileRefsPeerNode(peerId))); + return getReplicableHFiles0(peerId); } catch (KeeperException e) { throw new ReplicationException("Failed to get list of hfile references for peer " + peerId, e); } } + // will be overridden in UTs + @VisibleForTesting + protected int getHFileRefsZNodeCversion() throws ReplicationException { + Stat stat = new Stat(); + try { + ZKUtil.getDataNoWatch(zookeeper, hfileRefsZNode, stat); + } catch (KeeperException e) { + throw new ReplicationException("Failed to get stat of replication hfile references node.", e); + } + return stat.getCversion(); + } + + @Override + public Set<String> getAllHFileRefs() throws ReplicationException { + try { + for (int retry = 0;; retry++) { + int v0 = getHFileRefsZNodeCversion(); + List<String> peers = getAllPeersFromHFileRefsQueue(); + if (peers.isEmpty()) { + LOG.debug("Didn't find any peers with hfile references, won't prevent any deletions."); + return Collections.emptySet(); + } + Set<String> hfileRefs = new HashSet<>(); + for (String peer : peers) { + hfileRefs.addAll(getReplicableHFiles0(peer)); + } + int v1 = getHFileRefsZNodeCversion(); + if (v0 == v1) { + return hfileRefs; + } + LOG.debug(String.format( + "Replication hfile references node cversion changed from " + "%d to %d, retry = %d", v0, + v1, retry)); + } + } catch (KeeperException e) { + throw new ReplicationException("Failed to get all hfile refs", e); + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/15055a42/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java new file mode 100644 index 0000000..6fe869c --- /dev/null +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -0,0 +1,378 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; +import org.apache.zookeeper.KeeperException; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * White box testing for replication state interfaces. Implementations should extend this class, and + * initialize the interfaces properly. + */ +public abstract class TestReplicationStateBasic { + + protected ReplicationQueues rq1; + protected ReplicationQueues rq2; + protected ReplicationQueues rq3; + protected ReplicationQueueStorage rqs; + protected ServerName server1 = ServerName.valueOf("hostname1.example.org", 1234, 12345); + protected ServerName server2 = ServerName.valueOf("hostname2.example.org", 1234, 12345); + protected ServerName server3 = ServerName.valueOf("hostname3.example.org", 1234, 12345); + protected ReplicationPeers rp; + protected static final String ID_ONE = "1"; + protected static final String ID_TWO = "2"; + protected static String KEY_ONE; + protected static String KEY_TWO; + + // For testing when we try to replicate to ourself + protected String OUR_ID = "3"; + protected String OUR_KEY; + + protected static int zkTimeoutCount; + protected static final int ZK_MAX_COUNT = 300; + protected static final int ZK_SLEEP_INTERVAL = 100; // millis + + private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class); + + @Test + public void testReplicationQueueStorage() throws ReplicationException { + // Test methods with empty state + assertEquals(0, rqs.getListOfReplicators().size()); + assertTrue(rqs.getWALsInQueue(server1, "qId1").isEmpty()); + assertTrue(rqs.getAllQueues(server1).isEmpty()); + + /* + * Set up data Two replicators: -- server1: three queues with 0, 1 and 2 log files each -- + * server2: zero queues + */ + rq1.init(server1.getServerName()); + rq2.init(server2.getServerName()); + rq1.addLog("qId1", "trash"); + rq1.removeLog("qId1", "trash"); + rq1.addLog("qId2", "filename1"); + rq1.addLog("qId3", "filename2"); + rq1.addLog("qId3", "filename3"); + rq2.addLog("trash", "trash"); + rq2.removeQueue("trash"); + + List<ServerName> reps = rqs.getListOfReplicators(); + assertEquals(2, reps.size()); + assertTrue(server1.getServerName(), reps.contains(server1)); + assertTrue(server2.getServerName(), reps.contains(server2)); + + assertTrue(rqs.getWALsInQueue(ServerName.valueOf("bogus", 12345, 12345), "bogus").isEmpty()); + assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty()); + assertEquals(0, rqs.getWALsInQueue(server1, "qId1").size()); + assertEquals(1, rqs.getWALsInQueue(server1, "qId2").size()); + assertEquals("filename1", rqs.getWALsInQueue(server1, "qId2").get(0)); + + assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 12345, -1L)).isEmpty()); + assertEquals(0, rqs.getAllQueues(server2).size()); + List<String> list = rqs.getAllQueues(server1); + assertEquals(3, list.size()); + assertTrue(list.contains("qId2")); + assertTrue(list.contains("qId3")); + } + + @Test + public void testReplicationQueues() throws ReplicationException { + rq1.init(server1.getServerName()); + rq2.init(server2.getServerName()); + rq3.init(server3.getServerName()); + // Initialize ReplicationPeer so we can add peers (we don't transfer lone queues) + rp.init(); + + // 3 replicators should exist + assertEquals(3, rq1.getListOfReplicators().size()); + rq1.removeQueue("bogus"); + rq1.removeLog("bogus", "bogus"); + rq1.removeAllQueues(); + assertEquals(0, rq1.getAllQueues().size()); + assertEquals(0, rq1.getLogPosition("bogus", "bogus")); + assertNull(rq1.getLogsInQueue("bogus")); + assertNull(rq1.getUnClaimedQueueIds(ServerName.valueOf("bogus", 1234, -1L).toString())); + + rq1.setLogPosition("bogus", "bogus", 5L); + + populateQueues(); + + assertEquals(3, rq1.getListOfReplicators().size()); + assertEquals(0, rq2.getLogsInQueue("qId1").size()); + assertEquals(5, rq3.getLogsInQueue("qId5").size()); + assertEquals(0, rq3.getLogPosition("qId1", "filename0")); + rq3.setLogPosition("qId5", "filename4", 354L); + assertEquals(354L, rq3.getLogPosition("qId5", "filename4")); + + assertEquals(5, rq3.getLogsInQueue("qId5").size()); + assertEquals(0, rq2.getLogsInQueue("qId1").size()); + assertEquals(0, rq1.getAllQueues().size()); + assertEquals(1, rq2.getAllQueues().size()); + assertEquals(5, rq3.getAllQueues().size()); + + assertEquals(0, rq3.getUnClaimedQueueIds(server1.getServerName()).size()); + rq3.removeReplicatorIfQueueIsEmpty(server1.getServerName()); + assertEquals(2, rq3.getListOfReplicators().size()); + + List<String> queues = rq2.getUnClaimedQueueIds(server3.getServerName()); + assertEquals(5, queues.size()); + for (String queue : queues) { + rq2.claimQueue(server3.getServerName(), queue); + } + rq2.removeReplicatorIfQueueIsEmpty(server3.getServerName()); + assertEquals(1, rq2.getListOfReplicators().size()); + + // Try to claim our own queues + assertNull(rq2.getUnClaimedQueueIds(server2.getServerName())); + rq2.removeReplicatorIfQueueIsEmpty(server2.getServerName()); + + assertEquals(6, rq2.getAllQueues().size()); + + rq2.removeAllQueues(); + + assertEquals(0, rq2.getListOfReplicators().size()); + } + + @Test + public void testInvalidClusterKeys() throws ReplicationException, KeeperException { + rp.init(); + + try { + rp.registerPeer(ID_ONE, + new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:hbase")); + fail("Should throw an IllegalArgumentException because " + + "zookeeper.znode.parent is missing leading '/'."); + } catch (IllegalArgumentException e) { + // Expected. + } + + try { + rp.registerPeer(ID_ONE, + new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:/")); + fail("Should throw an IllegalArgumentException because zookeeper.znode.parent is missing."); + } catch (IllegalArgumentException e) { + // Expected. + } + + try { + rp.registerPeer(ID_ONE, + new ReplicationPeerConfig().setClusterKey("hostname1.example.org::/hbase")); + fail("Should throw an IllegalArgumentException because " + + "hbase.zookeeper.property.clientPort is missing."); + } catch (IllegalArgumentException e) { + // Expected. + } + } + + @Test + public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException { + rp.init(); + rq1.init(server1.getServerName()); + + List<Pair<Path, Path>> files1 = new ArrayList<>(3); + files1.add(new Pair<>(null, new Path("file_1"))); + files1.add(new Pair<>(null, new Path("file_2"))); + files1.add(new Pair<>(null, new Path("file_3"))); + assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty()); + assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size()); + rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); + rq1.addPeerToHFileRefs(ID_ONE); + rq1.addHFileRefs(ID_ONE, files1); + assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size()); + assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size()); + List<String> hfiles2 = new ArrayList<>(files1.size()); + for (Pair<Path, Path> p : files1) { + hfiles2.add(p.getSecond().getName()); + } + String removedString = hfiles2.remove(0); + rq1.removeHFileRefs(ID_ONE, hfiles2); + assertEquals(1, rqs.getReplicableHFiles(ID_ONE).size()); + hfiles2 = new ArrayList<>(1); + hfiles2.add(removedString); + rq1.removeHFileRefs(ID_ONE, hfiles2); + assertEquals(0, rqs.getReplicableHFiles(ID_ONE).size()); + rp.unregisterPeer(ID_ONE); + } + + @Test + public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException { + rq1.init(server1.getServerName()); + + rp.init(); + rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); + rq1.addPeerToHFileRefs(ID_ONE); + rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO)); + rq1.addPeerToHFileRefs(ID_TWO); + + List<Pair<Path, Path>> files1 = new ArrayList<>(3); + files1.add(new Pair<>(null, new Path("file_1"))); + files1.add(new Pair<>(null, new Path("file_2"))); + files1.add(new Pair<>(null, new Path("file_3"))); + rq1.addHFileRefs(ID_ONE, files1); + rq1.addHFileRefs(ID_TWO, files1); + assertEquals(2, rqs.getAllPeersFromHFileRefsQueue().size()); + assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size()); + assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size()); + + rp.unregisterPeer(ID_ONE); + rq1.removePeerFromHFileRefs(ID_ONE); + assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size()); + assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty()); + assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size()); + + rp.unregisterPeer(ID_TWO); + rq1.removePeerFromHFileRefs(ID_TWO); + assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size()); + assertTrue(rqs.getReplicableHFiles(ID_TWO).isEmpty()); + } + + @Test + public void testReplicationPeers() throws Exception { + rp.init(); + + // Test methods with non-existent peer ids + try { + rp.unregisterPeer("bogus"); + fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); + } catch (IllegalArgumentException e) { + } + try { + rp.enablePeer("bogus"); + fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); + } catch (IllegalArgumentException e) { + } + try { + rp.disablePeer("bogus"); + fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); + } catch (IllegalArgumentException e) { + } + try { + rp.getStatusOfPeer("bogus"); + fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); + } catch (IllegalArgumentException e) { + } + assertFalse(rp.peerConnected("bogus")); + rp.peerDisconnected("bogus"); + + assertNull(rp.getPeerConf("bogus")); + assertNumberOfPeers(0); + + // Add some peers + rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); + assertNumberOfPeers(1); + rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO)); + assertNumberOfPeers(2); + + // Test methods with a peer that is added but not connected + try { + rp.getStatusOfPeer(ID_ONE); + fail("There are no connected peers, should have thrown an IllegalArgumentException"); + } catch (IllegalArgumentException e) { + } + assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond())); + rp.unregisterPeer(ID_ONE); + rp.peerDisconnected(ID_ONE); + assertNumberOfPeers(1); + + // Add one peer + rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); + rp.peerConnected(ID_ONE); + assertNumberOfPeers(2); + assertTrue(rp.getStatusOfPeer(ID_ONE)); + rp.disablePeer(ID_ONE); + // now we do not rely on zk watcher to trigger the state change so we need to trigger it + // manually... + assertEquals(PeerState.DISABLED, rp.getConnectedPeer(ID_ONE).getPeerState(true)); + assertConnectedPeerStatus(false, ID_ONE); + rp.enablePeer(ID_ONE); + // now we do not rely on zk watcher to trigger the state change so we need to trigger it + // manually... + assertEquals(PeerState.ENABLED, rp.getConnectedPeer(ID_ONE).getPeerState(true)); + assertConnectedPeerStatus(true, ID_ONE); + + // Disconnect peer + rp.peerDisconnected(ID_ONE); + assertNumberOfPeers(2); + try { + rp.getStatusOfPeer(ID_ONE); + fail("There are no connected peers, should have thrown an IllegalArgumentException"); + } catch (IllegalArgumentException e) { + } + } + + protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception { + // we can first check if the value was changed in the store, if it wasn't then fail right away + if (status != rp.getStatusOfPeerFromBackingStore(peerId)) { + fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK"); + } + while (true) { + if (status == rp.getStatusOfPeer(peerId)) { + return; + } + if (zkTimeoutCount < ZK_MAX_COUNT) { + LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status + + ", sleeping and trying again."); + Thread.sleep(ZK_SLEEP_INTERVAL); + } else { + fail("Timed out waiting for ConnectedPeerStatus to be " + status); + } + } + } + + protected void assertNumberOfPeers(int total) { + assertEquals(total, rp.getAllPeerConfigs().size()); + assertEquals(total, rp.getAllPeerIds().size()); + assertEquals(total, rp.getAllPeerIds().size()); + } + + /* + * three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 has 5 queues with 1, 2, + * 3, 4, 5 log files respectively + */ + protected void populateQueues() throws ReplicationException { + rq1.addLog("trash", "trash"); + rq1.removeQueue("trash"); + + rq2.addLog("qId1", "trash"); + rq2.removeLog("qId1", "trash"); + + for (int i = 1; i < 6; i++) { + for (int j = 0; j < i; j++) { + rq3.addLog("qId" + i, "filename" + j); + } + // Add peers for the corresponding queues so they are not orphans + rp.registerPeer("qId" + i, + new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus" + i)); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/15055a42/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java new file mode 100644 index 0000000..6abe3f8 --- /dev/null +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.ClusterId; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseZKTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.zookeeper.ZKClusterId; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.zookeeper.KeeperException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestReplicationStateZKImpl extends TestReplicationStateBasic { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReplicationStateZKImpl.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateZKImpl.class); + + private static Configuration conf; + private static HBaseZKTestingUtility utility; + private static ZKWatcher zkw; + private static String replicationZNode; + private ReplicationQueuesZKImpl rqZK; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + utility = new HBaseZKTestingUtility(); + utility.startMiniZKCluster(); + conf = utility.getConfiguration(); + conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); + zkw = utility.getZooKeeperWatcher(); + String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); + replicationZNode = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, replicationZNodeName); + KEY_ONE = initPeerClusterState("/hbase1"); + KEY_TWO = initPeerClusterState("/hbase2"); + } + + private static String initPeerClusterState(String baseZKNode) + throws IOException, KeeperException { + // Add a dummy region server and set up the cluster id + Configuration testConf = new Configuration(conf); + testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode); + ZKWatcher zkw1 = new ZKWatcher(testConf, "test1", null); + String fakeRs = ZNodePaths.joinZNode(zkw1.znodePaths.rsZNode, "hostname1.example.org:1234"); + ZKUtil.createWithParents(zkw1, fakeRs); + ZKClusterId.setClusterId(zkw1, new ClusterId()); + return ZKConfig.getZooKeeperClusterKey(testConf); + } + + @Before + public void setUp() { + zkTimeoutCount = 0; + WarnOnlyAbortable abortable = new WarnOnlyAbortable(); + try { + rq1 = ReplicationFactory + .getReplicationQueues(new ReplicationQueuesArguments(conf, abortable, zkw)); + rq2 = ReplicationFactory + .getReplicationQueues(new ReplicationQueuesArguments(conf, abortable, zkw)); + rq3 = ReplicationFactory + .getReplicationQueues(new ReplicationQueuesArguments(conf, abortable, zkw)); + rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf); + } catch (Exception e) { + // This should not occur, because getReplicationQueues() only throws for + // TableBasedReplicationQueuesImpl + fail("ReplicationFactory.getReplicationQueues() threw an IO Exception"); + } + rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw); + OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf); + rqZK = new ReplicationQueuesZKImpl(zkw, conf, abortable); + } + + @After + public void tearDown() throws KeeperException, IOException { + ZKUtil.deleteNodeRecursively(zkw, replicationZNode); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + utility.shutdownMiniZKCluster(); + } + + @Test + public void testIsPeerPath_PathToParentOfPeerNode() { + assertFalse(rqZK.isPeerPath(rqZK.peersZNode)); + } + + @Test + public void testIsPeerPath_PathToChildOfPeerNode() { + String peerChild = ZNodePaths.joinZNode(ZNodePaths.joinZNode(rqZK.peersZNode, "1"), "child"); + assertFalse(rqZK.isPeerPath(peerChild)); + } + + @Test + public void testIsPeerPath_ActualPeerPath() { + String peerPath = ZNodePaths.joinZNode(rqZK.peersZNode, "1"); + assertTrue(rqZK.isPeerPath(peerPath)); + } + + private static class WarnOnlyAbortable implements Abortable { + + @Override + public void abort(String why, Throwable e) { + LOG.warn("TestReplicationStateZKImpl received abort, ignoring. Reason: " + why); + if (LOG.isDebugEnabled()) { + LOG.debug(e.toString(), e); + } + } + + @Override + public boolean isAborted() { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/15055a42/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java index d5bba0d..786730f 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java @@ -23,15 +23,18 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.Set; import java.util.SortedSet; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseZKTestingUtility; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Pair; +import org.apache.zookeeper.KeeperException; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -115,6 +118,15 @@ public class TestZKReplicationQueueStorage { assertEquals(2, queueIds.size()); assertThat(queueIds, hasItems("1", "2")); + List<String> wals1 = STORAGE.getWALsInQueue(serverName1, queue1); + List<String> wals2 = STORAGE.getWALsInQueue(serverName1, queue2); + assertEquals(10, wals1.size()); + assertEquals(10, wals1.size()); + for (int i = 0; i < 10; i++) { + assertThat(wals1, hasItems(getFileName("file1", i))); + assertThat(wals2, hasItems(getFileName("file2", i))); + } + for (int i = 0; i < 10; i++) { assertEquals(0, STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i))); assertEquals(0, STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i))); @@ -157,10 +169,20 @@ public class TestZKReplicationQueueStorage { queueIds = STORAGE.getAllQueues(serverName1); assertEquals(1, queueIds.size()); assertThat(queueIds, hasItems("2")); + wals2 = STORAGE.getWALsInQueue(serverName1, queue2); + assertEquals(5, wals2.size()); + for (i = 0; i < 10; i += 2) { + assertThat(wals2, hasItems(getFileName("file2", i))); + } queueIds = STORAGE.getAllQueues(serverName2); assertEquals(1, queueIds.size()); assertThat(queueIds, hasItems(peer1.getFirst())); + wals1 = STORAGE.getWALsInQueue(serverName2, peer1.getFirst()); + assertEquals(5, wals1.size()); + for (i = 1; i < 10; i += 2) { + assertThat(wals1, hasItems(getFileName("file1", i))); + } Set<String> allWals = STORAGE.getAllWALs(); assertEquals(10, allWals.size()); @@ -168,4 +190,56 @@ public class TestZKReplicationQueueStorage { assertThat(allWals, hasItems(i % 2 == 0 ? getFileName("file2", i) : getFileName("file1", i))); } } + + // For HBASE-12865 + @Test + public void testClaimQueueChangeCversion() throws ReplicationException, KeeperException { + ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000); + STORAGE.addWAL(serverName1, "1", "file"); + + int v0 = STORAGE.getQueuesZNodeCversion(); + ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001); + STORAGE.claimQueue(serverName1, "1", serverName2); + int v1 = STORAGE.getQueuesZNodeCversion(); + // cversion should increase by 1 since a child node is deleted + assertEquals(1, v1 - v0); + } + + private ZKReplicationQueueStorage createWithUnstableCversion() throws IOException { + return new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()) { + + private int called = 0; + + @Override + protected int getQueuesZNodeCversion() throws KeeperException { + if (called < 4) { + called++; + } + return called; + } + }; + } + + @Test + public void testGetAllWALsCversionChange() throws IOException, ReplicationException { + ZKReplicationQueueStorage storage = createWithUnstableCversion(); + storage.addWAL(getServerName(0), "1", "file"); + // This should return eventually when cversion stabilizes + Set<String> allWals = storage.getAllWALs(); + assertEquals(1, allWals.size()); + assertThat(allWals, hasItems("file")); + } + + // For HBASE-14621 + @Test + public void testGetAllHFileRefsCversionChange() throws IOException, ReplicationException { + ZKReplicationQueueStorage storage = createWithUnstableCversion(); + storage.addPeerToHFileRefs("1"); + Path p = new Path("/test"); + storage.addHFileRefs("1", Arrays.asList(Pair.newPair(p, p))); + // This should return eventually when cversion stabilizes + Set<String> allHFileRefs = storage.getAllHFileRefs(); + assertEquals(1, allHFileRefs.size()); + assertThat(allHFileRefs, hasItems("test")); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/15055a42/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java index 97deab5..af41399 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java @@ -23,21 +23,23 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.Map.Entry; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,23 +50,19 @@ import org.slf4j.LoggerFactory; @InterfaceAudience.Private public class ReplicationZKNodeCleaner { private static final Logger LOG = LoggerFactory.getLogger(ReplicationZKNodeCleaner.class); - private final ZKWatcher zkw; - private final ReplicationQueuesClient queuesClient; + private final ReplicationQueueStorage queueStorage; private final ReplicationPeers replicationPeers; private final ReplicationQueueDeletor queueDeletor; public ReplicationZKNodeCleaner(Configuration conf, ZKWatcher zkw, Abortable abortable) throws IOException { try { - this.zkw = zkw; - this.queuesClient = ReplicationFactory - .getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf, abortable, zkw)); - this.queuesClient.init(); - this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.queuesClient, - abortable); + this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf); + this.replicationPeers = + ReplicationFactory.getReplicationPeers(zkw, conf, this.queueStorage, abortable); this.replicationPeers.init(); this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, abortable); - } catch (Exception e) { + } catch (ReplicationException e) { throw new IOException("failed to construct ReplicationZKNodeCleaner", e); } } @@ -73,16 +71,16 @@ public class ReplicationZKNodeCleaner { * @return undeletedQueues replicator with its queueIds for removed peers * @throws IOException */ - public Map<String, List<String>> getUnDeletedQueues() throws IOException { - Map<String, List<String>> undeletedQueues = new HashMap<>(); + public Map<ServerName, List<String>> getUnDeletedQueues() throws IOException { + Map<ServerName, List<String>> undeletedQueues = new HashMap<>(); Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds()); try { - List<String> replicators = this.queuesClient.getListOfReplicators(); + List<ServerName> replicators = this.queueStorage.getListOfReplicators(); if (replicators == null || replicators.isEmpty()) { return undeletedQueues; } - for (String replicator : replicators) { - List<String> queueIds = this.queuesClient.getAllQueues(replicator); + for (ServerName replicator : replicators) { + List<String> queueIds = this.queueStorage.getAllQueues(replicator); for (String queueId : queueIds) { ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); if (!peerIds.contains(queueInfo.getPeerId())) { @@ -96,7 +94,7 @@ public class ReplicationZKNodeCleaner { } } } - } catch (KeeperException ke) { + } catch (ReplicationException ke) { throw new IOException("Failed to get the replication queues of all replicators", ke); } return undeletedQueues; @@ -105,25 +103,21 @@ public class ReplicationZKNodeCleaner { /** * @return undeletedHFileRefsQueue replicator with its undeleted queueIds for removed peers in * hfile-refs queue - * @throws IOException */ public Set<String> getUnDeletedHFileRefsQueues() throws IOException { Set<String> undeletedHFileRefsQueue = new HashSet<>(); Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds()); String hfileRefsZNode = queueDeletor.getHfileRefsZNode(); try { - if (-1 == ZKUtil.checkExists(zkw, hfileRefsZNode)) { - return null; - } - List<String> listOfPeers = this.queuesClient.getAllPeersFromHFileRefsQueue(); + List<String> listOfPeers = this.queueStorage.getAllPeersFromHFileRefsQueue(); Set<String> peers = new HashSet<>(listOfPeers); peers.removeAll(peerIds); if (!peers.isEmpty()) { undeletedHFileRefsQueue.addAll(peers); } - } catch (KeeperException e) { - throw new IOException("Failed to get list of all peers from hfile-refs znode " - + hfileRefsZNode, e); + } catch (ReplicationException e) { + throw new IOException( + "Failed to get list of all peers from hfile-refs znode " + hfileRefsZNode, e); } return undeletedHFileRefsQueue; } @@ -137,21 +131,20 @@ public class ReplicationZKNodeCleaner { /** * @param replicator The regionserver which has undeleted queue * @param queueId The undeleted queue id - * @throws IOException */ - public void removeQueue(final String replicator, final String queueId) throws IOException { - String queueZnodePath = - ZNodePaths.joinZNode(ZNodePaths.joinZNode(this.queuesZNode, replicator), queueId); + public void removeQueue(final ServerName replicator, final String queueId) throws IOException { + String queueZnodePath = ZNodePaths + .joinZNode(ZNodePaths.joinZNode(this.queuesZNode, replicator.getServerName()), queueId); try { ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); if (!replicationPeers.getAllPeerIds().contains(queueInfo.getPeerId())) { ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath); - LOG.info("Successfully removed replication queue, replicator: " + replicator - + ", queueId: " + queueId); + LOG.info("Successfully removed replication queue, replicator: " + replicator + + ", queueId: " + queueId); } } catch (KeeperException e) { - throw new IOException("Failed to delete queue, replicator: " + replicator + ", queueId: " - + queueId); + throw new IOException( + "Failed to delete queue, replicator: " + replicator + ", queueId: " + queueId); } } @@ -183,9 +176,9 @@ public class ReplicationZKNodeCleaner { * @param undeletedQueues replicator with its queueIds for removed peers * @throws IOException */ - public void removeQueues(final Map<String, List<String>> undeletedQueues) throws IOException { - for (Entry<String, List<String>> replicatorAndQueueIds : undeletedQueues.entrySet()) { - String replicator = replicatorAndQueueIds.getKey(); + public void removeQueues(final Map<ServerName, List<String>> undeletedQueues) throws IOException { + for (Entry<ServerName, List<String>> replicatorAndQueueIds : undeletedQueues.entrySet()) { + ServerName replicator = replicatorAndQueueIds.getKey(); for (String queueId : replicatorAndQueueIds.getValue()) { queueDeletor.removeQueue(replicator, queueId); } http://git-wip-us.apache.org/repos/asf/hbase/blob/15055a42/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java index 8d5df9b..19ca804 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.master.cleaner; import java.io.IOException; @@ -23,6 +22,7 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Stoppable; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -45,11 +45,10 @@ public class ReplicationZKNodeCleanerChore extends ScheduledChore { @Override protected void chore() { try { - Map<String, List<String>> undeletedQueues = cleaner.getUnDeletedQueues(); + Map<ServerName, List<String>> undeletedQueues = cleaner.getUnDeletedQueues(); cleaner.removeQueues(undeletedQueues); } catch (IOException e) { LOG.warn("Failed to clean replication zk node", e); } } - } http://git-wip-us.apache.org/repos/asf/hbase/blob/15055a42/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 5abd874..84abfeb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; @@ -147,28 +148,13 @@ public final class ReplicationPeerManager { } } - private ReplicationPeerConfig copy(ReplicationPeerConfig peerConfig) { - ReplicationPeerConfig copiedPeerConfig = new ReplicationPeerConfig(); - copiedPeerConfig.getConfiguration().putAll(peerConfig.getConfiguration()); - copiedPeerConfig.getPeerData().putAll(peerConfig.getPeerData()); - copiedPeerConfig.setTableCFsMap(peerConfig.getTableCFsMap()); - copiedPeerConfig.setNamespaces(peerConfig.getNamespaces()); - copiedPeerConfig.setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap()); - copiedPeerConfig.setExcludeNamespaces(peerConfig.getExcludeNamespaces()); - copiedPeerConfig.setBandwidth(peerConfig.getBandwidth()); - copiedPeerConfig.setReplicateAllUserTables(peerConfig.replicateAllUserTables()); - copiedPeerConfig.setClusterKey(peerConfig.getClusterKey()); - copiedPeerConfig.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl()); - return copiedPeerConfig; - } - public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) throws ReplicationException { if (peers.containsKey(peerId)) { // this should be a retry, just return return; } - ReplicationPeerConfig copiedPeerConfig = copy(peerConfig); + ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build(); peerStorage.addPeer(peerId, copiedPeerConfig, enabled); peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig)); } @@ -205,13 +191,14 @@ public final class ReplicationPeerManager { // the checking rules are too complicated here so we give up checking whether this is a retry. ReplicationPeerDescription desc = peers.get(peerId); ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig(); - ReplicationPeerConfig newPeerConfig = copy(peerConfig); + ReplicationPeerConfigBuilder newPeerConfigBuilder = + ReplicationPeerConfig.newBuilder(peerConfig); // we need to use the new conf to overwrite the old one. - newPeerConfig.getConfiguration().putAll(oldPeerConfig.getConfiguration()); - newPeerConfig.getConfiguration().putAll(peerConfig.getConfiguration()); - newPeerConfig.getPeerData().putAll(oldPeerConfig.getPeerData()); - newPeerConfig.getPeerData().putAll(peerConfig.getPeerData()); - + newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration()); + newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration()); + newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration()); + newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration()); + ReplicationPeerConfig newPeerConfig = newPeerConfigBuilder.build(); peerStorage.updatePeerConfig(peerId, newPeerConfig); peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig)); } http://git-wip-us.apache.org/repos/asf/hbase/blob/15055a42/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java index 5f1df44..7b62169 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java @@ -1,42 +1,43 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable - * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" - * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License - * for the specific language governing permissions and limitations under the License. +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.hadoop.hbase.replication.master; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hbase.thirdparty.com.google.common.base.Predicate; -import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; -import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; -import org.apache.hbase.thirdparty.com.google.common.collect.Sets; - import java.io.IOException; import java.util.Collections; -import java.util.List; import java.util.Set; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; -import org.apache.hadoop.hbase.replication.ReplicationFactory; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.zookeeper.KeeperException; +import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.base.Predicate; +import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; + /** * Implementation of a file cleaner that checks if a hfile is still scheduled for replication before * deleting it from hfile archive directory. @@ -45,7 +46,7 @@ import org.slf4j.LoggerFactory; public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate { private static final Logger LOG = LoggerFactory.getLogger(ReplicationHFileCleaner.class); private ZKWatcher zkw; - private ReplicationQueuesClient rqc; + private ReplicationQueueStorage rqs; private boolean stopped = false; @Override @@ -60,8 +61,8 @@ public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate { try { // The concurrently created new hfile entries in ZK may not be included in the return list, // but they won't be deleted because they're not in the checking set. - hfileRefs = loadHFileRefsFromPeers(); - } catch (KeeperException e) { + hfileRefs = rqs.getAllHFileRefs(); + } catch (ReplicationException e) { LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable files"); return Collections.emptyList(); } @@ -82,37 +83,6 @@ public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate { }); } - /** - * Load all hfile references in all replication queues from ZK. This method guarantees to return a - * snapshot which contains all hfile references in the zookeeper at the start of this call. - * However, some newly created hfile references during the call may not be included. - */ - private Set<String> loadHFileRefsFromPeers() throws KeeperException { - Set<String> hfileRefs = Sets.newHashSet(); - List<String> listOfPeers; - for (int retry = 0;; retry++) { - int v0 = rqc.getHFileRefsNodeChangeVersion(); - hfileRefs.clear(); - listOfPeers = rqc.getAllPeersFromHFileRefsQueue(); - if (listOfPeers == null) { - LOG.debug("Didn't find any peers with hfile references, won't prevent any deletions."); - return ImmutableSet.of(); - } - for (String id : listOfPeers) { - List<String> peerHFileRefs = rqc.getReplicableHFiles(id); - if (peerHFileRefs != null) { - hfileRefs.addAll(peerHFileRefs); - } - } - int v1 = rqc.getHFileRefsNodeChangeVersion(); - if (v0 == v1) { - return hfileRefs; - } - LOG.debug(String.format("Replication hfile references node cversion changed from " - + "%d to %d, retry = %d", v0, v1, retry)); - } - } - @Override public void setConf(Configuration config) { // If either replication or replication of bulk load hfiles is disabled, keep all members null @@ -139,17 +109,15 @@ public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate { public void setConf(Configuration conf, ZKWatcher zk) { super.setConf(conf); try { - initReplicationQueuesClient(conf, zk); + initReplicationQueueStorage(conf, zk); } catch (Exception e) { LOG.error("Error while configuring " + this.getClass().getName(), e); } } - private void initReplicationQueuesClient(Configuration conf, ZKWatcher zk) - throws Exception { + private void initReplicationQueueStorage(Configuration conf, ZKWatcher zk) { this.zkw = zk; - this.rqc = ReplicationFactory.getReplicationQueuesClient(new ReplicationQueuesClientArguments( - conf, new WarnOnlyAbortable(), zkw)); + this.rqs = ReplicationStorageFactory.getReplicationQueueStorage(zk, conf); } @Override @@ -179,25 +147,12 @@ public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate { } try { - hfileRefsFromQueue = loadHFileRefsFromPeers(); - } catch (KeeperException e) { + hfileRefsFromQueue = rqs.getAllHFileRefs(); + } catch (ReplicationException e) { LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable " + "file for " + fStat.getPath()); return false; } return !hfileRefsFromQueue.contains(fStat.getPath().getName()); } - - private static class WarnOnlyAbortable implements Abortable { - @Override - public void abort(String why, Throwable e) { - LOG.warn("ReplicationHFileCleaner received abort, ignoring. Reason: " + why); - LOG.debug(e.toString(), e); - } - - @Override - public boolean isAborted() { - return false; - } - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/15055a42/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java index 5128d58..e0e2c71 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,19 +20,16 @@ package org.apache.hadoop.hbase.replication.master; import java.io.IOException; import java.util.Collections; import java.util.Set; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; -import org.apache.hadoop.hbase.replication.ReplicationFactory; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +45,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; public class ReplicationLogCleaner extends BaseLogCleanerDelegate { private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogCleaner.class); private ZKWatcher zkw; - private ReplicationQueuesClient replicationQueues; + private ReplicationQueueStorage queueStorage; private boolean stopped = false; private Set<String> wals; private long readZKTimestamp = 0; @@ -60,8 +56,8 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { try { // The concurrently created new WALs may not be included in the return list, // but they won't be deleted because they're not in the checking set. - wals = replicationQueues.getAllWALs(); - } catch (KeeperException e) { + wals = queueStorage.getAllWALs(); + } catch (ReplicationException e) { LOG.warn("Failed to read zookeeper, skipping checking deletable files"); wals = null; } @@ -108,22 +104,12 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { super.setConf(conf); try { this.zkw = zk; - this.replicationQueues = ReplicationFactory.getReplicationQueuesClient( - new ReplicationQueuesClientArguments(conf, new WarnOnlyAbortable(), zkw)); - this.replicationQueues.init(); + this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zk, conf); } catch (Exception e) { LOG.error("Error while configuring " + this.getClass().getName(), e); } } - @VisibleForTesting - public void setConf(Configuration conf, ZKWatcher zk, - ReplicationQueuesClient replicationQueuesClient) { - super.setConf(conf); - this.zkw = zk; - this.replicationQueues = replicationQueuesClient; - } - @Override public void stop(String why) { if (this.stopped) return; @@ -138,18 +124,4 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { public boolean isStopped() { return this.stopped; } - - public static class WarnOnlyAbortable implements Abortable { - - @Override - public void abort(String why, Throwable e) { - LOG.warn("ReplicationLogCleaner received abort, ignoring. Reason: " + why); - LOG.debug(e.toString(), e); - } - - @Override - public boolean isAborted() { - return false; - } - } }