Repository: hbase Updated Branches: refs/heads/branch-1.3 5824f2236 -> b0e1fdae3
HBASE-15888 Extend HBASE-12769 for bulk load data replication Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b0e1fdae Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b0e1fdae Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b0e1fdae Branch: refs/heads/branch-1.3 Commit: b0e1fdae346b64af4188cf5df29488617753416f Parents: 5824f22 Author: Ashish Singhi <ashishsin...@apache.org> Authored: Fri Jun 3 18:48:47 2016 +0530 Committer: Ashish Singhi <ashishsin...@apache.org> Committed: Fri Jun 3 18:48:47 2016 +0530 ---------------------------------------------------------------------- .../replication/ReplicationPeersZKImpl.java | 6 ++ .../hbase/util/hbck/ReplicationChecker.java | 59 ++++++++++++++++++-- 2 files changed, 61 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/b0e1fdae/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index ad634fa..b0d6e83 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -601,6 +601,12 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } } } + // Check for hfile-refs queue + if (-1 != ZKUtil.checkExists(zookeeper, hfileRefsZNode) + && queuesClient.getAllPeersFromHFileRefsQueue().contains(peerId)) { + throw new ReplicationException("Undeleted queue for peerId: " + peerId + + ", found in hfile-refs node path " + hfileRefsZNode); + } } catch (KeeperException e) { throw new ReplicationException("Could not check queues deleted with id=" + peerId, e); } http://git-wip-us.apache.org/repos/asf/hbase/blob/b0e1fdae/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java index bf44a50..64212c9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java @@ -51,16 +51,21 @@ import org.apache.zookeeper.KeeperException; @InterfaceAudience.Private public class ReplicationChecker { private static final Log LOG = LogFactory.getLog(ReplicationChecker.class); + private final ZooKeeperWatcher zkw; private ErrorReporter errorReporter; private ReplicationQueuesClient queuesClient; private ReplicationPeers replicationPeers; private ReplicationQueueDeletor queueDeletor; // replicator with its queueIds for removed peers private Map<String, List<String>> undeletedQueueIds = new HashMap<String, List<String>>(); - + // Set of un deleted hfile refs queue Ids + private Set<String> undeletedHFileRefsQueueIds = new HashSet<>(); + private final String hfileRefsZNode; + public ReplicationChecker(Configuration conf, ZooKeeperWatcher zkw, HConnection connection, ErrorReporter errorReporter) throws IOException { try { + this.zkw = zkw; this.errorReporter = errorReporter; this.queuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, conf, connection); this.queuesClient.init(); @@ -71,6 +76,13 @@ public class ReplicationChecker { } catch (ReplicationException e) { throw new IOException("failed to construct ReplicationChecker", e); } + + String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); + String replicationZNode = ZKUtil.joinZNode(this.zkw.baseZNode, replicationZNodeName); + String hfileRefsZNodeName = + conf.get(ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY, + ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT); + hfileRefsZNode = ZKUtil.joinZNode(replicationZNode, hfileRefsZNodeName); } public boolean hasUnDeletedQueues() { @@ -103,13 +115,37 @@ public class ReplicationChecker { } catch (KeeperException ke) { throw new IOException(ke); } + + checkUnDeletedHFileRefsQueues(peerIds); + } + + private void checkUnDeletedHFileRefsQueues(Set<String> peerIds) throws IOException { + try { + if (-1 == ZKUtil.checkExists(zkw, hfileRefsZNode)) { + return; + } + List<String> listOfPeers = this.queuesClient.getAllPeersFromHFileRefsQueue(); + Set<String> peers = new HashSet<>(listOfPeers); + peers.removeAll(peerIds); + if (!peers.isEmpty()) { + undeletedHFileRefsQueueIds.addAll(peers); + String msg = + "Undeleted replication hfile-refs queue for removed peer found: " + + undeletedHFileRefsQueueIds + " under hfile-refs node " + hfileRefsZNode; + errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, + msg); + } + } catch (KeeperException e) { + throw new IOException("Failed to get list of all peers from hfile-refs znode " + + hfileRefsZNode, e); + } } - + private static class ReplicationQueueDeletor extends ReplicationStateZKBase { public ReplicationQueueDeletor(ZooKeeperWatcher zk, Configuration conf, Abortable abortable) { super(zk, conf, abortable); } - + public void removeQueue(String replicator, String queueId) throws IOException { String queueZnodePath = ZKUtil.joinZNode(ZKUtil.joinZNode(this.queuesZNode, replicator), queueId); @@ -122,7 +158,7 @@ public class ReplicationChecker { } } } - + public void fixUnDeletedQueues() throws IOException { for (Entry<String, List<String>> replicatorAndQueueIds : undeletedQueueIds.entrySet()) { String replicator = replicatorAndQueueIds.getKey(); @@ -130,5 +166,20 @@ public class ReplicationChecker { queueDeletor.removeQueue(replicator, queueId); } } + fixUnDeletedHFileRefsQueue(); + } + + private void fixUnDeletedHFileRefsQueue() throws IOException { + for (String hfileRefsQueueId : undeletedHFileRefsQueueIds) { + String node = ZKUtil.joinZNode(hfileRefsZNode, hfileRefsQueueId); + try { + ZKUtil.deleteNodeRecursively(this.zkw, node); + LOG.info("Successfully deleted hfile-refs queue " + hfileRefsQueueId + " from path " + + hfileRefsZNode); + } catch (KeeperException e) { + throw new IOException("Failed to delete hfile-refs queue " + hfileRefsQueueId + + " from path " + hfileRefsZNode); + } + } } }