HBASE-12770 Don't transfer all the queued hlogs of a dead server to the same alive server
Signed-off-by: zhangduo <zhang...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e5f9df1e Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e5f9df1e Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e5f9df1e Branch: refs/heads/hbase-12439 Commit: e5f9df1e2394994323b4a5bfe2d7ba58aa669acd Parents: 30d7eea Author: Phil Yang <ud1...@gmail.com> Authored: Thu Jul 21 16:33:44 2016 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Mon Aug 8 16:30:23 2016 +0800 ---------------------------------------------------------------------- .../hbase/replication/ReplicationQueues.java | 25 +- .../replication/ReplicationQueuesZKImpl.java | 263 ++++++++++--------- .../TableBasedReplicationQueuesImpl.java | 53 ++-- .../regionserver/ReplicationSourceManager.java | 26 +- .../replication/TestReplicationStateBasic.java | 17 +- .../TestReplicationStateHBaseImpl.java | 32 ++- .../TestReplicationSourceManager.java | 8 +- .../TestReplicationSourceManagerZkImpl.java | 28 +- 8 files changed, 264 insertions(+), 188 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/e5f9df1e/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java index 0de0cc8..0ae27d0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java @@ -19,10 +19,10 @@ package org.apache.hadoop.hbase.replication; import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.SortedSet; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Pair; /** * This provides an interface for maintaining a region server's replication queues. These queues @@ -94,12 +94,25 @@ public interface ReplicationQueues { List<String> getAllQueues(); /** - * Take ownership for the set of queues belonging to a dead region server. + * Get queueIds from a dead region server, whose queues has not been claimed by other region + * servers. + * @return empty if the queue exists but no children, null if the queue does not exist. + */ + List<String> getUnClaimedQueueIds(String regionserver); + + /** + * Take ownership for the queue identified by queueId and belongs to a dead region server. * @param regionserver the id of the dead region server - * @return A Map of the queues that have been claimed, including a Set of WALs in - * each queue. Returns an empty map if no queues were failed-over. + * @param queueId the id of the queue + * @return the new PeerId and A SortedSet of WALs in its queue, and null if no unclaimed queue. + */ + Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId); + + /** + * Remove the znode of region server if the queue is empty. + * @param regionserver */ - Map<String, Set<String>> claimQueues(String regionserver); + void removeReplicatorIfQueueIsEmpty(String regionserver); /** * Get a list of all region servers that have outstanding replication queues. These servers could http://git-wip-us.apache.org/repos/asf/hbase/blob/e5f9df1e/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java index 655aaae..c1e85cb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java @@ -19,11 +19,9 @@ package org.apache.hadoop.hbase.replication; import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; @@ -36,6 +34,7 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -179,21 +178,66 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R } @Override - public Map<String, Set<String>> claimQueues(String regionserverZnode) { - Map<String, Set<String>> newQueues = new HashMap<>(); - // check whether there is multi support. If yes, use it. + public List<String> getUnClaimedQueueIds(String regionserver) { + if (isThisOurRegionServer(regionserver)) { + return null; + } + String rsZnodePath = ZKUtil.joinZNode(this.queuesZNode, regionserver); + List<String> queues = null; + try { + queues = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZnodePath); + } catch (KeeperException e) { + this.abortable.abort("Failed to getUnClaimedQueueIds for RS" + regionserver, e); + } + if (queues != null) { + queues.remove(RS_LOCK_ZNODE); + } + return queues; + } + + @Override + public Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId) { if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) { - LOG.info("Atomically moving " + regionserverZnode + "'s WALs to my queue"); - newQueues = copyQueuesFromRSUsingMulti(regionserverZnode); + LOG.info("Atomically moving " + regionserver + "/" + queueId + "'s WALs to my queue"); + return moveQueueUsingMulti(regionserver, queueId); } else { - LOG.info("Moving " + regionserverZnode + "'s wals to my queue"); - if (!lockOtherRS(regionserverZnode)) { - return newQueues; + LOG.info("Moving " + regionserver + "/" + queueId + "'s wals to my queue"); + if (!lockOtherRS(regionserver)) { + LOG.info("Can not take the lock now"); + return null; } - newQueues = copyQueuesFromRS(regionserverZnode); - deleteAnotherRSQueues(regionserverZnode); + Pair<String, SortedSet<String>> newQueues; + try { + newQueues = copyQueueFromLockedRS(regionserver, queueId); + removeQueueFromLockedRS(regionserver, queueId); + } finally { + unlockOtherRS(regionserver); + } + return newQueues; + } + } + + private void removeQueueFromLockedRS(String znode, String peerId) { + String nodePath = ZKUtil.joinZNode(this.queuesZNode, znode); + String peerPath = ZKUtil.joinZNode(nodePath, peerId); + try { + ZKUtil.deleteNodeRecursively(this.zookeeper, peerPath); + } catch (KeeperException e) { + LOG.warn("Remove copied queue failed", e); + } + } + + @Override + public void removeReplicatorIfQueueIsEmpty(String regionserver) { + String rsPath = ZKUtil.joinZNode(this.queuesZNode, regionserver); + try { + List<String> list = ZKUtil.listChildrenNoWatch(this.zookeeper, rsPath); + if (list != null && list.size() == 0){ + ZKUtil.deleteNode(this.zookeeper, rsPath); + } + } catch (KeeperException e) { + LOG.warn("Got error while removing replicator", e); } - return newQueues; } @Override @@ -276,36 +320,13 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R return ZKUtil.checkExists(zookeeper, getLockZNode(znode)) >= 0; } - /** - * Delete all the replication queues for a given region server. - * @param regionserverZnode The znode of the region server to delete. - */ - private void deleteAnotherRSQueues(String regionserverZnode) { - String fullpath = ZKUtil.joinZNode(this.queuesZNode, regionserverZnode); + private void unlockOtherRS(String znode){ + String parent = ZKUtil.joinZNode(this.queuesZNode, znode); + String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE); try { - List<String> clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, fullpath); - for (String cluster : clusters) { - // No need to delete, it will be deleted later. - if (cluster.equals(RS_LOCK_ZNODE)) { - continue; - } - String fullClusterPath = ZKUtil.joinZNode(fullpath, cluster); - ZKUtil.deleteNodeRecursively(this.zookeeper, fullClusterPath); - } - // Finish cleaning up - ZKUtil.deleteNodeRecursively(this.zookeeper, fullpath); + ZKUtil.deleteNode(this.zookeeper, p); } catch (KeeperException e) { - if (e instanceof KeeperException.NoNodeException - || e instanceof KeeperException.NotEmptyException) { - // Testing a special case where another region server was able to - // create a lock just after we deleted it, but then was also able to - // delete the RS znode before us or its lock znode is still there. - if (e.getPath().equals(fullpath)) { - return; - } - } - this.abortable.abort("Failed to delete replication queues for region server: " - + regionserverZnode, e); + this.abortable.abort("Remove lock failed", e); } } @@ -313,38 +334,30 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R * It "atomically" copies all the wals queues from another region server and returns them all * sorted per peer cluster (appended with the dead server's znode). * @param znode pertaining to the region server to copy the queues from - * @return WAL queues sorted per peer cluster */ - private Map<String, Set<String>> copyQueuesFromRSUsingMulti(String znode) { - Map<String, Set<String>> queues = new HashMap<>(); - // hbase/replication/rs/deadrs - String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode); - List<String> peerIdsToProcess = null; - List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>(); + private Pair<String, SortedSet<String>> moveQueueUsingMulti(String znode, String peerId) { try { - peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath); - if (peerIdsToProcess == null) return queues; // node already processed - for (String peerId : peerIdsToProcess) { - ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); - if (!peerExists(replicationQueueInfo.getPeerId())) { - // the orphaned queues must be moved, otherwise the delete op of dead rs will fail, - // this will cause the whole multi op fail. - // NodeFailoverWorker will skip the orphaned queues. - LOG.warn("Peer " + peerId - + " didn't exist, will move its queue to avoid the failure of multi op"); - } - String newPeerId = peerId + "-" + znode; - String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId); - // check the logs queue for the old peer cluster - String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId); - List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode); - if (wals == null || wals.size() == 0) { - listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); - continue; // empty log queue. - } + // hbase/replication/rs/deadrs + String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode); + List<ZKUtilOp> listOfOps = new ArrayList<>(); + ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); + if (!peerExists(replicationQueueInfo.getPeerId())) { + // the orphaned queues must be moved, otherwise the delete op of dead rs will fail, + // this will cause the whole multi op fail. + // NodeFailoverWorker will skip the orphaned queues. + LOG.warn("Peer " + peerId + + " didn't exist, will move its queue to avoid the failure of multi op"); + } + String newPeerId = peerId + "-" + znode; + String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId); + // check the logs queue for the old peer cluster + String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId); + List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode); + SortedSet<String> logQueue = new TreeSet<>(); + if (wals == null || wals.size() == 0) { + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); + } else { // create the new cluster znode - Set<String> logQueue = new HashSet<String>(); - queues.put(newPeerId, logQueue); ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY); listOfOps.add(op); // get the offset of the logs and set it to new znodes @@ -354,98 +367,86 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R LOG.debug("Creating " + wal + " with data " + Bytes.toString(logOffset)); String newLogZnode = ZKUtil.joinZNode(newPeerZnode, wal); listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset)); - // add ops for deleting listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode)); logQueue.add(wal); } // add delete op for peer listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); + + if (LOG.isTraceEnabled()) + LOG.trace(" The multi list size is: " + listOfOps.size()); } - // add delete op for dead rs, this will update the cversion of the parent. - // The reader will make optimistic locking with this to get a consistent - // snapshot - listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath)); - if (LOG.isTraceEnabled()) LOG.trace(" The multi list size is: " + listOfOps.size()); ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false); - if (LOG.isTraceEnabled()) LOG.trace("Atomically moved the dead regionserver logs. "); + if (LOG.isTraceEnabled()) + LOG.trace("Atomically moved the dead regionserver logs. "); + return new Pair<>(newPeerId, logQueue); } catch (KeeperException e) { // Multi call failed; it looks like some other regionserver took away the logs. LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e); - queues.clear(); } catch (InterruptedException e) { LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e); - queues.clear(); Thread.currentThread().interrupt(); } - return queues; + return null; } /** - * This methods copies all the wals queues from another region server and returns them all sorted + * This methods moves all the wals queues from another region server and returns them all sorted * per peer cluster (appended with the dead server's znode) * @param znode server names to copy - * @return all wals for all peers of that cluster, null if an error occurred + * @return all wals for the peer of that cluster, null if an error occurred */ - private Map<String, Set<String>> copyQueuesFromRS(String znode) { + private Pair<String, SortedSet<String>> copyQueueFromLockedRS(String znode, String peerId) { // TODO this method isn't atomic enough, we could start copying and then // TODO fail for some reason and we would end up with znodes we don't want. - Map<String, Set<String>> queues = new HashMap<>(); try { String nodePath = ZKUtil.joinZNode(this.queuesZNode, znode); - List<String> clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath); - // We have a lock znode in there, it will count as one. - if (clusters == null || clusters.size() <= 1) { - return queues; + ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); + String clusterPath = ZKUtil.joinZNode(nodePath, peerId); + if (!peerExists(replicationQueueInfo.getPeerId())) { + LOG.warn("Peer " + peerId + " didn't exist, skipping the replay"); + // Protection against moving orphaned queues + return null; } - // The lock isn't a peer cluster, remove it - clusters.remove(RS_LOCK_ZNODE); - for (String cluster : clusters) { - ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(cluster); - if (!peerExists(replicationQueueInfo.getPeerId())) { - LOG.warn("Peer " + cluster + " didn't exist, skipping the replay"); - // Protection against moving orphaned queues - continue; - } - // We add the name of the recovered RS to the new znode, we can even - // do that for queues that were recovered 10 times giving a znode like - // number-startcode-number-otherstartcode-number-anotherstartcode-etc - String newCluster = cluster + "-" + znode; - String newClusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, newCluster); - String clusterPath = ZKUtil.joinZNode(nodePath, cluster); - List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath); - // That region server didn't have anything to replicate for this cluster - if (wals == null || wals.size() == 0) { - continue; - } - ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode, + // We add the name of the recovered RS to the new znode, we can even + // do that for queues that were recovered 10 times giving a znode like + // number-startcode-number-otherstartcode-number-anotherstartcode-etc + String newCluster = peerId + "-" + znode; + String newClusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, newCluster); + + List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath); + // That region server didn't have anything to replicate for this cluster + if (wals == null || wals.size() == 0) { + return null; + } + ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode, HConstants.EMPTY_BYTE_ARRAY); - Set<String> logQueue = new HashSet<String>(); - queues.put(newCluster, logQueue); - for (String wal : wals) { - String z = ZKUtil.joinZNode(clusterPath, wal); - byte[] positionBytes = ZKUtil.getData(this.zookeeper, z); - long position = 0; - try { - position = ZKUtil.parseWALPositionFrom(positionBytes); - } catch (DeserializationException e) { - LOG.warn("Failed parse of wal position from the following znode: " + z - + ", Exception: " + e); - } - LOG.debug("Creating " + wal + " with data " + position); - String child = ZKUtil.joinZNode(newClusterZnode, wal); - // Position doesn't actually change, we are just deserializing it for - // logging, so just use the already serialized version - ZKUtil.createAndWatch(this.zookeeper, child, positionBytes); - logQueue.add(wal); + SortedSet<String> logQueue = new TreeSet<>(); + for (String wal : wals) { + String z = ZKUtil.joinZNode(clusterPath, wal); + byte[] positionBytes = ZKUtil.getData(this.zookeeper, z); + long position = 0; + try { + position = ZKUtil.parseWALPositionFrom(positionBytes); + } catch (DeserializationException e) { + LOG.warn("Failed parse of wal position from the following znode: " + z + + ", Exception: " + e); } + LOG.debug("Creating " + wal + " with data " + position); + String child = ZKUtil.joinZNode(newClusterZnode, wal); + // Position doesn't actually change, we are just deserializing it for + // logging, so just use the already serialized version + ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, child, positionBytes); + logQueue.add(wal); } + return new Pair<>(newCluster, logQueue); } catch (KeeperException e) { - this.abortable.abort("Copy queues from rs", e); + LOG.warn("Got exception in copyQueueFromLockedRS: ", e); } catch (InterruptedException e) { LOG.warn(e); Thread.currentThread().interrupt(); } - return queues; + return null; } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/e5f9df1e/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java index 3ee6fde..28b9bdf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -48,6 +49,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; /** * This class provides an implementation of the ReplicationQueues interface using an HBase table @@ -227,31 +230,54 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase return getAllQueues(serverName); } + @Override public List<String> getUnClaimedQueueIds(String regionserver) { + if (isThisOurRegionServer(regionserver)) { + return null; + } + try (ResultScanner queuesToClaim = getQueuesBelongingToServer(regionserver)) { + List<String> res = new ArrayList<>(); + for (Result queue : queuesToClaim) { + String rowKey = Bytes.toString(queue.getRow()); + res.add(rowKey); + } + return res.isEmpty() ? null : res; + } catch (IOException e) { + String errMsg = "Failed getUnClaimedQueueIds"; + abortable.abort(errMsg, e); + } + return null; + } + + @Override public void removeReplicatorIfQueueIsEmpty(String regionserver) { + // Do nothing here + } + @Override - public Map<String, Set<String>> claimQueues(String regionserver) { - Map<String, Set<String>> queues = new HashMap<>(); + public Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId) { if (isThisOurRegionServer(regionserver)) { - return queues; + return null; } - ResultScanner queuesToClaim = null; - try { - queuesToClaim = getQueuesBelongingToServer(regionserver); + + try (ResultScanner queuesToClaim = getQueuesBelongingToServer(regionserver)){ for (Result queue : queuesToClaim) { + String rowKey = Bytes.toString(queue.getRow()); + if (!rowKey.equals(queueId)){ + continue; + } if (attemptToClaimQueue(queue, regionserver)) { - String rowKey = Bytes.toString(queue.getRow()); ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(rowKey); if (replicationState.peerExists(replicationQueueInfo.getPeerId())) { - Set<String> sortedLogs = new HashSet<String>(); + SortedSet<String> sortedLogs = new TreeSet<>(); List<String> logs = getLogsInQueue(queue.getRow()); for (String log : logs) { sortedLogs.add(log); } - queues.put(rowKey, sortedLogs); LOG.info(serverName + " has claimed queue " + rowKey + " from " + regionserver); + return new Pair<>(rowKey, sortedLogs); } else { // Delete orphaned queues removeQueue(Bytes.toString(queue.getRow())); - LOG.info(serverName + " has deleted abandoned queue " + rowKey + " from " + + LOG.info(serverName + " has deleted abandoned queue " + queueId + " from " + regionserver); } } @@ -259,13 +285,8 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase } catch (IOException | KeeperException e) { String errMsg = "Failed claiming queues for regionserver=" + regionserver; abortable.abort(errMsg, e); - queues.clear(); - } finally { - if (queuesToClaim != null) { - queuesToClaim.close(); - } } - return queues; + return null; } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/e5f9df1e/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 586aace..3cb7a84 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationTracker; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; /** @@ -647,10 +648,27 @@ public class ReplicationSourceManager implements ReplicationListener { LOG.info("Not transferring queue since we are shutting down"); return; } - Map<String, Set<String>> newQueues = null; - - newQueues = this.rq.claimQueues(rsZnode); - + Map<String, Set<String>> newQueues = new HashMap<>(); + List<String> peers = rq.getUnClaimedQueueIds(rsZnode); + while (peers != null && !peers.isEmpty()) { + Pair<String, SortedSet<String>> peer = this.rq.claimQueue(rsZnode, + peers.get(rand.nextInt(peers.size()))); + long sleep = sleepBeforeFailover/2; + if (peer != null) { + newQueues.put(peer.getFirst(), peer.getSecond()); + sleep = sleepBeforeFailover; + } + try { + Thread.sleep(sleep); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting before transferring a queue."); + Thread.currentThread().interrupt(); + } + peers = rq.getUnClaimedQueueIds(rsZnode); + } + if (peers != null) { + rq.removeReplicatorIfQueueIsEmpty(rsZnode); + } // Copying over the failed queue is completed. if (newQueues.isEmpty()) { // We either didn't get the lock or the failed region server didn't have any outstanding http://git-wip-us.apache.org/repos/asf/hbase/blob/e5f9df1e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index 933c621..fcab105 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -22,8 +22,6 @@ import static org.junit.Assert.*; import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -124,7 +122,8 @@ public abstract class TestReplicationStateBasic { assertEquals(0, rq1.getAllQueues().size()); assertEquals(0, rq1.getLogPosition("bogus", "bogus")); assertNull(rq1.getLogsInQueue("bogus")); - assertEquals(0, rq1.claimQueues(ServerName.valueOf("bogus", 1234, -1L).toString()).size()); + assertNull(rq1.getUnClaimedQueueIds( + ServerName.valueOf("bogus", 1234, -1L).toString())); rq1.setLogPosition("bogus", "bogus", 5L); @@ -143,15 +142,21 @@ public abstract class TestReplicationStateBasic { assertEquals(1, rq2.getAllQueues().size()); assertEquals(5, rq3.getAllQueues().size()); - assertEquals(0, rq3.claimQueues(server1).size()); + assertEquals(0, rq3.getUnClaimedQueueIds(server1).size()); + rq3.removeReplicatorIfQueueIsEmpty(server1); assertEquals(2, rq3.getListOfReplicators().size()); - Map<String, Set<String>> queues = rq2.claimQueues(server3); + List<String> queues = rq2.getUnClaimedQueueIds(server3); assertEquals(5, queues.size()); + for(String queue: queues) { + rq2.claimQueue(server3, queue); + } + rq2.removeReplicatorIfQueueIsEmpty(server3); assertEquals(1, rq2.getListOfReplicators().size()); // Try to claim our own queues - assertEquals(0, rq2.claimQueues(server2).size()); + assertNull(rq2.getUnClaimedQueueIds(server2)); + rq2.removeReplicatorIfQueueIsEmpty(server2); assertEquals(6, rq2.getAllQueues().size()); http://git-wip-us.apache.org/repos/asf/hbase/blob/e5f9df1e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java index 7ec6df8..35c4e24 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -40,8 +41,6 @@ import org.junit.experimental.categories.Category; import java.io.IOException; import java.util.List; -import java.util.Map; -import java.util.Set; import static junit.framework.TestCase.assertNull; import static org.junit.Assert.assertEquals; @@ -267,18 +266,19 @@ public class TestReplicationStateHBaseImpl { } try { // Test claiming queues - Map<String, Set<String>> claimedQueuesFromRq2 = rq1.claimQueues(server2); + List<String> claimedQueuesFromRq2 = rq1.getUnClaimedQueueIds(server2); // Check to make sure that list of peers with outstanding queues is decremented by one // after claimQueues + // Check to make sure that we claimed the proper number of queues + assertEquals(2, claimedQueuesFromRq2.size()); + assertTrue(claimedQueuesFromRq2.contains("Queue1-" + server2)); + assertTrue(claimedQueuesFromRq2.contains("Queue2-" + server2)); + assertEquals(2, rq1.claimQueue(server2, "Queue1-" + server2).getSecond().size()); + assertEquals(1, rq1.claimQueue(server2, "Queue2-" + server2).getSecond().size()); + rq1.removeReplicatorIfQueueIsEmpty(server2); assertEquals(rq1.getListOfReplicators().size(), 2); assertEquals(rq2.getListOfReplicators().size(), 2); assertEquals(rq3.getListOfReplicators().size(), 2); - // Check to make sure that we claimed the proper number of queues - assertEquals(2, claimedQueuesFromRq2.size()); - assertTrue(claimedQueuesFromRq2.containsKey("Queue1-" + server2)); - assertTrue(claimedQueuesFromRq2.containsKey("Queue2-" + server2)); - assertEquals(2, claimedQueuesFromRq2.get("Queue1-" + server2).size()); - assertEquals(1, claimedQueuesFromRq2.get("Queue2-" + server2).size()); assertEquals(5, rq1.getAllQueues().size()); // Check that all the logs in the other queue were claimed assertEquals(2, rq1.getLogsInQueue("Queue1-" + server2).size()); @@ -294,7 +294,11 @@ public class TestReplicationStateHBaseImpl { rq1.addLog("UnclaimableQueue", "WALLogFile1.1"); rq1.addLog("UnclaimableQueue", "WALLogFile1.2"); assertEquals(6, rq1.getAllQueues().size()); - Map<String, Set<String>> claimedQueuesFromRq1 = rq3.claimQueues(server1); + List<String> claimedQueuesFromRq1 = rq3.getUnClaimedQueueIds(server1); + for(String queue : claimedQueuesFromRq1) { + rq3.claimQueue(server1, queue); + } + rq3.removeReplicatorIfQueueIsEmpty(server1); assertEquals(rq1.getListOfReplicators().size(), 1); assertEquals(rq2.getListOfReplicators().size(), 1); assertEquals(rq3.getListOfReplicators().size(), 1); @@ -302,12 +306,12 @@ public class TestReplicationStateHBaseImpl { // Replication Peers assertEquals(6, rq3.getAllQueues().size()); // Test claiming non-existing queues - Map<String, Set<String>> noQueues = rq3.claimQueues("NotARealServer"); - assertEquals(0, noQueues.size()); + List<String> noQueues = rq3.getUnClaimedQueueIds("NotARealServer"); + assertNull(noQueues); assertEquals(6, rq3.getAllQueues().size()); // Test claiming own queues - noQueues = rq3.claimQueues(server3); - assertEquals(0, noQueues.size()); + noQueues = rq3.getUnClaimedQueueIds(server3); + Assert.assertNull(noQueues); assertEquals(6, rq3.getAllQueues().size()); // Check that rq3 still remain on list of replicators assertEquals(1, rq3.getListOfReplicators().size()); http://git-wip-us.apache.org/repos/asf/hbase/blob/e5f9df1e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 7696e95..4ee783d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -75,6 +75,7 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; @@ -517,7 +518,12 @@ public abstract class TestReplicationSourceManager { @Override public void run() { try { - logZnodesMap = rq.claimQueues(deadRsZnode); + logZnodesMap = new HashMap<>(); + List<String> queues = rq.getUnClaimedQueueIds(deadRsZnode); + for(String queue:queues){ + Pair<String, SortedSet<String>> pair = rq.claimQueue(deadRsZnode, queue); + logZnodesMap.put(pair.getFirst(), pair.getSecond()); + } server.abort("Done with testing", null); } catch (Exception e) { LOG.error("Got exception while running NodeFailoverWorker", e); http://git-wip-us.apache.org/repos/asf/hbase/blob/e5f9df1e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java index 75ed835..a9d0766 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java @@ -36,8 +36,6 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import java.util.List; -import java.util.Map; -import java.util.Set; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -87,22 +85,28 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1, s1.getZooKeeper())); rq1.init(s1.getServerName().toString()); - Map<String, Set<String>> testMap = - rq1.claimQueues(server.getServerName().getServerName()); + String serverName = server.getServerName().getServerName(); + List<String> unclaimed = rq1.getUnClaimedQueueIds(serverName); + rq1.claimQueue(serverName, unclaimed.get(0)).getSecond(); + rq1.removeReplicatorIfQueueIsEmpty(unclaimed.get(0)); ReplicationQueues rq2 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s2.getConfiguration(), s2, s2.getZooKeeper())); rq2.init(s2.getServerName().toString()); - testMap = rq2.claimQueues(s1.getServerName().getServerName()); + serverName = s1.getServerName().getServerName(); + unclaimed = rq2.getUnClaimedQueueIds(serverName); + rq2.claimQueue(serverName, unclaimed.get(0)).getSecond(); + rq2.removeReplicatorIfQueueIsEmpty(unclaimed.get(0)); ReplicationQueues rq3 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s3.getConfiguration(), s3, s3.getZooKeeper())); rq3.init(s3.getServerName().toString()); - testMap = rq3.claimQueues(s2.getServerName().getServerName()); - - ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.keySet().iterator().next()); + serverName = s2.getServerName().getServerName(); + unclaimed = rq3.getUnClaimedQueueIds(serverName); + String queue3 = rq3.claimQueue(serverName, unclaimed.get(0)).getFirst(); + rq3.removeReplicatorIfQueueIsEmpty(unclaimed.get(0)); + ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queue3); List<String> result = replicationQueueInfo.getDeadRegionServers(); - // verify assertTrue(result.contains(server.getServerName().getServerName())); assertTrue(result.contains(s1.getServerName().getServerName())); @@ -137,7 +141,11 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan new ReplicationQueuesClientArguments(s1.getConfiguration(), s1, s1.getZooKeeper())); int v0 = client.getQueuesZNodeCversion(); - rq1.claimQueues(s0.getServerName().getServerName()); + List<String> queues = rq1.getUnClaimedQueueIds(s0.getServerName().getServerName()); + for(String queue : queues) { + rq1.claimQueue(s0.getServerName().getServerName(), queue); + } + rq1.removeReplicatorIfQueueIsEmpty(s0.getServerName().getServerName()); int v1 = client.getQueuesZNodeCversion(); // cversion should increase by 1 since a child node is deleted assertEquals(v0 + 1, v1);