Repository: hbase
Updated Branches:
  refs/heads/branch-1.3 5824f2236 -> b0e1fdae3


HBASE-15888 Extend HBASE-12769 for bulk load data replication


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b0e1fdae
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b0e1fdae
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b0e1fdae

Branch: refs/heads/branch-1.3
Commit: b0e1fdae346b64af4188cf5df29488617753416f
Parents: 5824f22
Author: Ashish Singhi <ashishsin...@apache.org>
Authored: Fri Jun 3 18:48:47 2016 +0530
Committer: Ashish Singhi <ashishsin...@apache.org>
Committed: Fri Jun 3 18:48:47 2016 +0530

----------------------------------------------------------------------
 .../replication/ReplicationPeersZKImpl.java     |  6 ++
 .../hbase/util/hbck/ReplicationChecker.java     | 59 ++++++++++++++++++--
 2 files changed, 61 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/b0e1fdae/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index ad634fa..b0d6e83 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -601,6 +601,12 @@ public class ReplicationPeersZKImpl extends 
ReplicationStateZKBase implements Re
           }
         }
       }
+      // Check for hfile-refs queue
+      if (-1 != ZKUtil.checkExists(zookeeper, hfileRefsZNode)
+          && queuesClient.getAllPeersFromHFileRefsQueue().contains(peerId)) {
+        throw new ReplicationException("Undeleted queue for peerId: " + peerId
+            + ", found in hfile-refs node path " + hfileRefsZNode);
+      }
     } catch (KeeperException e) {
       throw new ReplicationException("Could not check queues deleted with id=" 
+ peerId, e);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b0e1fdae/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
index bf44a50..64212c9 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
@@ -51,16 +51,21 @@ import org.apache.zookeeper.KeeperException;
 @InterfaceAudience.Private
 public class ReplicationChecker {
   private static final Log LOG = LogFactory.getLog(ReplicationChecker.class);
+  private final ZooKeeperWatcher zkw;
   private ErrorReporter errorReporter;
   private ReplicationQueuesClient queuesClient;
   private ReplicationPeers replicationPeers;
   private ReplicationQueueDeletor queueDeletor;
   // replicator with its queueIds for removed peers
   private Map<String, List<String>> undeletedQueueIds = new HashMap<String, 
List<String>>();
-  
+  // Set of un deleted hfile refs queue Ids
+  private Set<String> undeletedHFileRefsQueueIds = new HashSet<>();
+  private final String hfileRefsZNode;
+
   public ReplicationChecker(Configuration conf, ZooKeeperWatcher zkw, 
HConnection connection,
       ErrorReporter errorReporter) throws IOException {
     try {
+      this.zkw = zkw;
       this.errorReporter = errorReporter;
       this.queuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, 
conf, connection);
       this.queuesClient.init();
@@ -71,6 +76,13 @@ public class ReplicationChecker {
     } catch (ReplicationException e) {
       throw new IOException("failed to construct ReplicationChecker", e);
     }
+
+    String replicationZNodeName = conf.get("zookeeper.znode.replication", 
"replication");
+    String replicationZNode = ZKUtil.joinZNode(this.zkw.baseZNode, 
replicationZNodeName);
+    String hfileRefsZNodeName =
+        
conf.get(ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
+          
ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
+    hfileRefsZNode = ZKUtil.joinZNode(replicationZNode, hfileRefsZNodeName);
   }
 
   public boolean hasUnDeletedQueues() {
@@ -103,13 +115,37 @@ public class ReplicationChecker {
     } catch (KeeperException ke) {
       throw new IOException(ke);
     }
+
+    checkUnDeletedHFileRefsQueues(peerIds);
+  }
+
+  private void checkUnDeletedHFileRefsQueues(Set<String> peerIds) throws 
IOException {
+    try {
+      if (-1 == ZKUtil.checkExists(zkw, hfileRefsZNode)) {
+        return;
+      }
+      List<String> listOfPeers = 
this.queuesClient.getAllPeersFromHFileRefsQueue();
+      Set<String> peers = new HashSet<>(listOfPeers);
+      peers.removeAll(peerIds);
+      if (!peers.isEmpty()) {
+        undeletedHFileRefsQueueIds.addAll(peers);
+        String msg =
+            "Undeleted replication hfile-refs queue for removed peer found: "
+                + undeletedHFileRefsQueueIds + " under hfile-refs node " + 
hfileRefsZNode;
+        
errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE,
+          msg);
+      }
+    } catch (KeeperException e) {
+      throw new IOException("Failed to get list of all peers from hfile-refs 
znode "
+          + hfileRefsZNode, e);
+    }
   }
-  
+
   private static class ReplicationQueueDeletor extends ReplicationStateZKBase {
     public ReplicationQueueDeletor(ZooKeeperWatcher zk, Configuration conf, 
Abortable abortable) {
       super(zk, conf, abortable);
     }
-    
+
     public void removeQueue(String replicator, String queueId) throws 
IOException {
       String queueZnodePath = 
ZKUtil.joinZNode(ZKUtil.joinZNode(this.queuesZNode, replicator),
         queueId);
@@ -122,7 +158,7 @@ public class ReplicationChecker {
       }
     }
   }
-  
+
   public void fixUnDeletedQueues() throws IOException {
     for (Entry<String, List<String>> replicatorAndQueueIds : 
undeletedQueueIds.entrySet()) {
       String replicator = replicatorAndQueueIds.getKey();
@@ -130,5 +166,20 @@ public class ReplicationChecker {
         queueDeletor.removeQueue(replicator, queueId);
       }
     }
+    fixUnDeletedHFileRefsQueue();
+  }
+
+  private void fixUnDeletedHFileRefsQueue() throws IOException {
+    for (String hfileRefsQueueId : undeletedHFileRefsQueueIds) {
+      String node = ZKUtil.joinZNode(hfileRefsZNode, hfileRefsQueueId);
+      try {
+        ZKUtil.deleteNodeRecursively(this.zkw, node);
+        LOG.info("Successfully deleted hfile-refs queue " + hfileRefsQueueId + 
" from path "
+            + hfileRefsZNode);
+      } catch (KeeperException e) {
+        throw new IOException("Failed to delete hfile-refs queue " + 
hfileRefsQueueId
+            + " from path " + hfileRefsZNode);
+      }
+    }
   }
 }

Reply via email to