Repository: hbase Updated Branches: refs/heads/branch-1.3 ea3907da7 -> fd297e280
HBASE-12770 Don't transfer all the queued hlogs of a dead server to the same alive server Signed-off-by: zhangduo <zhang...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fd297e28 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fd297e28 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fd297e28 Branch: refs/heads/branch-1.3 Commit: fd297e280f25c26346c3343d6ea1be4f0362821e Parents: ea3907d Author: Phil Yang <ud1...@gmail.com> Authored: Thu Aug 4 19:33:01 2016 +0800 Committer: Mikhail Antonov <anto...@apache.org> Committed: Tue Apr 4 20:03:14 2017 -0700 ---------------------------------------------------------------------- .../hbase/replication/ReplicationQueues.java | 28 +- .../replication/ReplicationQueuesZKImpl.java | 253 +++++++++++-------- .../regionserver/ReplicationSourceManager.java | 25 +- .../replication/TestReplicationStateBasic.java | 16 +- .../TestReplicationSourceManager.java | 37 ++- 5 files changed, 231 insertions(+), 128 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/fd297e28/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java index 507367b..1b1c770 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java @@ -23,6 +23,7 @@ import java.util.SortedMap; import java.util.SortedSet; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Pair; /** * This provides an interface for maintaining a region server's replication queues. These queues @@ -94,14 +95,33 @@ public interface ReplicationQueues { List<String> getAllQueues(); /** - * Take ownership for the set of queues belonging to a dead region server. + * Checks if the provided znode is the same as this region server's + * @param regionserver the id of the region server + * @return if this is this rs's znode + */ + boolean isThisOurRegionServer(String regionserver); + + /** + * Get queueIds from a dead region server, whose queues has not been claimed by other region + * servers. + * @return empty if the queue exists but no children, null if the queue does not exist. + */ + List<String> getUnClaimedQueueIds(String regionserver); + + /** + * Take ownership for the queue identified by queueId and belongs to a dead region server. * @param regionserver the id of the dead region server - * @return A SortedMap of the queues that have been claimed, including a SortedSet of WALs in - * each queue. Returns an empty map if no queues were failed-over. + * @param queueId the id of the queue + * @return the new PeerId and A SortedSet of WALs in its queue, and null if no unclaimed queue. */ - SortedMap<String, SortedSet<String>> claimQueues(String regionserver); + Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId); /** + * Remove the znode of region server if the queue is empty. + * @param regionserver + */ + void removeReplicatorIfQueueIsEmpty(String regionserver); + /** * Get a list of all region servers that have outstanding replication queues. These servers could * be alive, dead or from a previous run of the cluster. * @return a list of server names http://git-wip-us.apache.org/repos/asf/hbase/blob/fd297e28/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java index c366a74..559ab41 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; @@ -174,24 +175,6 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R } @Override - public SortedMap<String, SortedSet<String>> claimQueues(String regionserverZnode) { - SortedMap<String, SortedSet<String>> newQueues = new TreeMap<String, SortedSet<String>>(); - // check whether there is multi support. If yes, use it. - if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) { - LOG.info("Atomically moving " + regionserverZnode + "'s WALs to my queue"); - newQueues = copyQueuesFromRSUsingMulti(regionserverZnode); - } else { - LOG.info("Moving " + regionserverZnode + "'s wals to my queue"); - if (!lockOtherRS(regionserverZnode)) { - return newQueues; - } - newQueues = copyQueuesFromRS(regionserverZnode); - deleteAnotherRSQueues(regionserverZnode); - } - return newQueues; - } - - @Override public void removeAllQueues() { try { ZKUtil.deleteNodeRecursively(this.zookeeper, this.myQueuesZnode); @@ -229,6 +212,74 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R return listOfQueues; } + @Override + public boolean isThisOurRegionServer(String regionserver) { + return ZKUtil.joinZNode(this.queuesZNode, regionserver).equals(this.myQueuesZnode); + } + + @Override + public List<String> getUnClaimedQueueIds(String regionserver) { + if (isThisOurRegionServer(regionserver)) { + return null; + } + String rsZnodePath = ZKUtil.joinZNode(this.queuesZNode, regionserver); + List<String> queues = null; + try { + queues = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZnodePath); + } catch (KeeperException e) { + this.abortable.abort("Failed to getUnClaimedQueueIds for " + regionserver, e); + } + if (queues != null) { + queues.remove(RS_LOCK_ZNODE); + } + return queues; + } + + @Override + public Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId) { + if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) { + LOG.info("Atomically moving " + regionserver + "/" + queueId + "'s WALs to my queue"); + return moveQueueUsingMulti(regionserver, queueId); + } else { + LOG.info("Moving " + regionserver + "/" + queueId + "'s wals to my queue"); + if (!lockOtherRS(regionserver)) { + LOG.info("Can not take the lock now"); + return null; + } + Pair<String, SortedSet<String>> newQueues; + try { + newQueues = copyQueueFromLockedRS(regionserver, queueId); + removeQueueFromLockedRS(regionserver, queueId); + } finally { + unlockOtherRS(regionserver); + } + return newQueues; + } + } + + private void removeQueueFromLockedRS(String znode, String peerId) { + String nodePath = ZKUtil.joinZNode(this.queuesZNode, znode); + String peerPath = ZKUtil.joinZNode(nodePath, peerId); + try { + ZKUtil.deleteNodeRecursively(this.zookeeper, peerPath); + } catch (KeeperException e) { + LOG.warn("Remove copied queue failed", e); + } + } + + @Override + public void removeReplicatorIfQueueIsEmpty(String regionserver) { + String rsPath = ZKUtil.joinZNode(this.queuesZNode, regionserver); + try { + List<String> list = ZKUtil.listChildrenNoWatch(this.zookeeper, rsPath); + if (list != null && list.size() == 0){ + ZKUtil.deleteNode(this.zookeeper, rsPath); + } + } catch (KeeperException e) { + LOG.warn("Got error while removing replicator", e); + } + } + /** * Try to set a lock in another region server's znode. * @param znode the server names of the other server @@ -271,6 +322,16 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R return ZKUtil.checkExists(zookeeper, getLockZNode(znode)) >= 0; } + private void unlockOtherRS(String znode){ + String parent = ZKUtil.joinZNode(this.queuesZNode, znode); + String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE); + try { + ZKUtil.deleteNode(this.zookeeper, p); + } catch (KeeperException e) { + this.abortable.abort("Remove lock failed", e); + } + } + /** * Delete all the replication queues for a given region server. * @param regionserverZnode The znode of the region server to delete. @@ -308,38 +369,30 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R * It "atomically" copies all the wals queues from another region server and returns them all * sorted per peer cluster (appended with the dead server's znode). * @param znode pertaining to the region server to copy the queues from - * @return WAL queues sorted per peer cluster */ - private SortedMap<String, SortedSet<String>> copyQueuesFromRSUsingMulti(String znode) { - SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>(); - // hbase/replication/rs/deadrs - String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode); - List<String> peerIdsToProcess = null; - List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>(); + private Pair<String, SortedSet<String>> moveQueueUsingMulti(String znode, String peerId) { try { - peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath); - if (peerIdsToProcess == null) return queues; // node already processed - for (String peerId : peerIdsToProcess) { - ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); - if (!peerExists(replicationQueueInfo.getPeerId())) { - // the orphaned queues must be moved, otherwise the delete op of dead rs will fail, - // this will cause the whole multi op fail. - // NodeFailoverWorker will skip the orphaned queues. - LOG.warn("Peer " + peerId - + " didn't exist, will move its queue to avoid the failure of multi op"); - } - String newPeerId = peerId + "-" + znode; - String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId); - // check the logs queue for the old peer cluster - String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId); - List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode); - if (wals == null || wals.size() == 0) { - listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); - continue; // empty log queue. - } + // hbase/replication/rs/deadrs + String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode); + List<ZKUtilOp> listOfOps = new ArrayList<>(); + ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); + if (!peerExists(replicationQueueInfo.getPeerId())) { + // the orphaned queues must be moved, otherwise the delete op of dead rs will fail, + // this will cause the whole multi op fail. + // NodeFailoverWorker will skip the orphaned queues. + LOG.warn("Peer " + peerId + + " didn't exist, will move its queue to avoid the failure of multi op"); + } + String newPeerId = peerId + "-" + znode; + String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId); + // check the logs queue for the old peer cluster + String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId); + List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode); + SortedSet<String> logQueue = new TreeSet<>(); + if (wals == null || wals.size() == 0) { + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); + } else { // create the new cluster znode - SortedSet<String> logQueue = new TreeSet<String>(); - queues.put(newPeerId, logQueue); ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY); listOfOps.add(op); // get the offset of the logs and set it to new znodes @@ -349,98 +402,86 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R LOG.debug("Creating " + wal + " with data " + Bytes.toString(logOffset)); String newLogZnode = ZKUtil.joinZNode(newPeerZnode, wal); listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset)); - // add ops for deleting listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode)); logQueue.add(wal); } // add delete op for peer listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); + + if (LOG.isTraceEnabled()) + LOG.trace(" The multi list size is: " + listOfOps.size()); } - // add delete op for dead rs, this will update the cversion of the parent. - // The reader will make optimistic locking with this to get a consistent - // snapshot - listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath)); - if (LOG.isTraceEnabled()) LOG.trace(" The multi list size is: " + listOfOps.size()); ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false); - if (LOG.isTraceEnabled()) LOG.trace("Atomically moved the dead regionserver logs. "); + if (LOG.isTraceEnabled()) + LOG.trace("Atomically moved the dead regionserver logs. "); + return new Pair<>(newPeerId, logQueue); } catch (KeeperException e) { // Multi call failed; it looks like some other regionserver took away the logs. LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e); - queues.clear(); } catch (InterruptedException e) { LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e); - queues.clear(); Thread.currentThread().interrupt(); } - return queues; + return null; } /** - * This methods copies all the wals queues from another region server and returns them all sorted + * This methods moves all the wals queues from another region server and returns them all sorted * per peer cluster (appended with the dead server's znode) * @param znode server names to copy - * @return all wals for all peers of that cluster, null if an error occurred + * @return all wals for the peer of that cluster, null if an error occurred */ - private SortedMap<String, SortedSet<String>> copyQueuesFromRS(String znode) { + private Pair<String, SortedSet<String>> copyQueueFromLockedRS(String znode, String peerId) { // TODO this method isn't atomic enough, we could start copying and then // TODO fail for some reason and we would end up with znodes we don't want. - SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>(); try { String nodePath = ZKUtil.joinZNode(this.queuesZNode, znode); - List<String> clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath); - // We have a lock znode in there, it will count as one. - if (clusters == null || clusters.size() <= 1) { - return queues; + ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); + String clusterPath = ZKUtil.joinZNode(nodePath, peerId); + if (!peerExists(replicationQueueInfo.getPeerId())) { + LOG.warn("Peer " + peerId + " didn't exist, skipping the replay"); + // Protection against moving orphaned queues + return null; } - // The lock isn't a peer cluster, remove it - clusters.remove(RS_LOCK_ZNODE); - for (String cluster : clusters) { - ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(cluster); - if (!peerExists(replicationQueueInfo.getPeerId())) { - LOG.warn("Peer " + cluster + " didn't exist, skipping the replay"); - // Protection against moving orphaned queues - continue; - } - // We add the name of the recovered RS to the new znode, we can even - // do that for queues that were recovered 10 times giving a znode like - // number-startcode-number-otherstartcode-number-anotherstartcode-etc - String newCluster = cluster + "-" + znode; - String newClusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, newCluster); - String clusterPath = ZKUtil.joinZNode(nodePath, cluster); - List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath); - // That region server didn't have anything to replicate for this cluster - if (wals == null || wals.size() == 0) { - continue; - } - ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode, + // We add the name of the recovered RS to the new znode, we can even + // do that for queues that were recovered 10 times giving a znode like + // number-startcode-number-otherstartcode-number-anotherstartcode-etc + String newCluster = peerId + "-" + znode; + String newClusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, newCluster); + + List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath); + // That region server didn't have anything to replicate for this cluster + if (wals == null || wals.size() == 0) { + return null; + } + ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode, HConstants.EMPTY_BYTE_ARRAY); - SortedSet<String> logQueue = new TreeSet<String>(); - queues.put(newCluster, logQueue); - for (String wal : wals) { - String z = ZKUtil.joinZNode(clusterPath, wal); - byte[] positionBytes = ZKUtil.getData(this.zookeeper, z); - long position = 0; - try { - position = ZKUtil.parseWALPositionFrom(positionBytes); - } catch (DeserializationException e) { - LOG.warn("Failed parse of wal position from the following znode: " + z - + ", Exception: " + e); - } - LOG.debug("Creating " + wal + " with data " + position); - String child = ZKUtil.joinZNode(newClusterZnode, wal); - // Position doesn't actually change, we are just deserializing it for - // logging, so just use the already serialized version - ZKUtil.createAndWatch(this.zookeeper, child, positionBytes); - logQueue.add(wal); + SortedSet<String> logQueue = new TreeSet<>(); + for (String wal : wals) { + String z = ZKUtil.joinZNode(clusterPath, wal); + byte[] positionBytes = ZKUtil.getData(this.zookeeper, z); + long position = 0; + try { + position = ZKUtil.parseWALPositionFrom(positionBytes); + } catch (DeserializationException e) { + LOG.warn("Failed parse of wal position from the following znode: " + z + + ", Exception: " + e); } + LOG.debug("Creating " + wal + " with data " + position); + String child = ZKUtil.joinZNode(newClusterZnode, wal); + // Position doesn't actually change, we are just deserializing it for + // logging, so just use the already serialized version + ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, child, positionBytes); + logQueue.add(wal); } + return new Pair<>(newCluster, logQueue); } catch (KeeperException e) { - this.abortable.abort("Copy queues from rs", e); + LOG.warn("Got exception in copyQueueFromLockedRS: ", e); } catch (InterruptedException e) { LOG.warn(e); Thread.currentThread().interrupt(); } - return queues; + return null; } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/fd297e28/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index c4dc800..b7a5839 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -32,7 +32,6 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; -import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeSet; import java.util.UUID; @@ -64,6 +63,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationTracker; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.DefaultWALProvider; /** @@ -672,9 +672,28 @@ public class ReplicationSourceManager implements ReplicationListener { LOG.info("Not transferring queue since we are shutting down"); return; } - SortedMap<String, SortedSet<String>> newQueues = null; - newQueues = this.rq.claimQueues(rsZnode); + Map<String, SortedSet<String>> newQueues = new HashMap<>(); + List<String> peers = rq.getUnClaimedQueueIds(rsZnode); + while (peers != null && !peers.isEmpty()) { + Pair<String, SortedSet<String>> peer = this.rq.claimQueue(rsZnode, + peers.get(rand.nextInt(peers.size()))); + long sleep = sleepBeforeFailover/2; + if (peer != null) { + newQueues.put(peer.getFirst(), peer.getSecond()); + sleep = sleepBeforeFailover; + } + try { + Thread.sleep(sleep); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting before transferring a queue."); + Thread.currentThread().interrupt(); + } + peers = rq.getUnClaimedQueueIds(rsZnode); + } + if (peers != null) { + rq.removeReplicatorIfQueueIsEmpty(rsZnode); + } // Copying over the failed queue is completed. if (newQueues.isEmpty()) { http://git-wip-us.apache.org/repos/asf/hbase/blob/fd297e28/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index c8e234e..e0fe10f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -22,8 +22,6 @@ import static org.junit.Assert.*; import java.util.ArrayList; import java.util.List; -import java.util.SortedMap; -import java.util.SortedSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -124,7 +122,7 @@ public abstract class TestReplicationStateBasic { assertNull(rq1.getAllQueues()); assertEquals(0, rq1.getLogPosition("bogus", "bogus")); assertNull(rq1.getLogsInQueue("bogus")); - assertEquals(0, rq1.claimQueues(ServerName.valueOf("bogus", 1234, -1L).toString()).size()); + assertNull(rq1.getUnClaimedQueueIds(ServerName.valueOf("bogus", 1234, -1L).toString())); rq1.setLogPosition("bogus", "bogus", 5L); @@ -143,15 +141,21 @@ public abstract class TestReplicationStateBasic { assertEquals(1, rq2.getAllQueues().size()); assertEquals(5, rq3.getAllQueues().size()); - assertEquals(0, rq3.claimQueues(server1).size()); + assertEquals(0, rq3.getUnClaimedQueueIds(server1).size()); + rq3.removeReplicatorIfQueueIsEmpty(server1); assertEquals(2, rq3.getListOfReplicators().size()); - SortedMap<String, SortedSet<String>> queues = rq2.claimQueues(server3); + List<String> queues = rq2.getUnClaimedQueueIds(server3); assertEquals(5, queues.size()); + for(String queue: queues) { + rq2.claimQueue(server3, queue); + } + rq2.removeReplicatorIfQueueIsEmpty(server3); assertEquals(1, rq2.getListOfReplicators().size()); // Try to claim our own queues - assertEquals(0, rq2.claimQueues(server2).size()); + assertNull(rq2.getUnClaimedQueueIds(server2)); + rq2.removeReplicatorIfQueueIsEmpty(server2); assertEquals(6, rq2.getAllQueues().size()); http://git-wip-us.apache.org/repos/asf/hbase/blob/fd297e28/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 012ef36..7614b0f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -36,6 +36,7 @@ import java.util.Map; import java.util.NavigableMap; import java.util.SortedMap; import java.util.SortedSet; +import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.CountDownLatch; @@ -80,6 +81,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; @@ -352,7 +354,7 @@ public class TestReplicationSourceManager { manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID( new Long(1), new Long(2))); w1.start(); - w1.join(5000); + w1.join(10000); assertEquals(1, manager.getWalsByIdRecoveredQueues().size()); String id = "1-" + server.getServerName().getServerName(); assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group)); @@ -385,18 +387,26 @@ public class TestReplicationSourceManager { ReplicationQueues rq1 = ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1); rq1.init(s1.getServerName().toString()); - SortedMap<String, SortedSet<String>> testMap = - rq1.claimQueues(server.getServerName().getServerName()); + String serverName = server.getServerName().getServerName(); + List<String> unclaimed = rq1.getUnClaimedQueueIds(serverName); + rq1.claimQueue(serverName, unclaimed.get(0)).getSecond(); + rq1.removeReplicatorIfQueueIsEmpty(unclaimed.get(0)); + ReplicationQueues rq2 = ReplicationFactory.getReplicationQueues(s2.getZooKeeper(), s2.getConfiguration(), s2); rq2.init(s2.getServerName().toString()); - testMap = rq2.claimQueues(s1.getServerName().getServerName()); + serverName = s1.getServerName().getServerName(); + unclaimed = rq2.getUnClaimedQueueIds(serverName); + rq2.claimQueue(serverName, unclaimed.get(0)).getSecond(); + rq2.removeReplicatorIfQueueIsEmpty(unclaimed.get(0)); ReplicationQueues rq3 = ReplicationFactory.getReplicationQueues(s3.getZooKeeper(), s3.getConfiguration(), s3); rq3.init(s3.getServerName().toString()); - testMap = rq3.claimQueues(s2.getServerName().getServerName()); - - ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.firstKey()); + serverName = s2.getServerName().getServerName(); + unclaimed = rq3.getUnClaimedQueueIds(serverName); + String queue3 = rq3.claimQueue(serverName, unclaimed.get(0)).getFirst(); + rq3.removeReplicatorIfQueueIsEmpty(unclaimed.get(0)); + ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queue3); List<String> result = replicationQueueInfo.getDeadRegionServers(); // verify @@ -432,7 +442,11 @@ public class TestReplicationSourceManager { ReplicationFactory.getReplicationQueuesClient(s1.getZooKeeper(), s1.getConfiguration(), s1); int v0 = client.getQueuesZNodeCversion(); - rq1.claimQueues(s0.getServerName().getServerName()); + List<String> queues = rq1.getUnClaimedQueueIds(s0.getServerName().getServerName()); + for(String queue : queues) { + rq1.claimQueue(s0.getServerName().getServerName(), queue); + } + rq1.removeReplicatorIfQueueIsEmpty(s0.getServerName().getServerName()); int v1 = client.getQueuesZNodeCversion(); // cversion should increased by 1 since a child node is deleted assertEquals(v0 + 1, v1); @@ -611,7 +625,12 @@ public class TestReplicationSourceManager { @Override public void run() { try { - logZnodesMap = rq.claimQueues(deadRsZnode); + logZnodesMap = new TreeMap<>(); + List<String> queues = rq.getUnClaimedQueueIds(deadRsZnode); + for(String queue:queues){ + Pair<String, SortedSet<String>> pair = rq.claimQueue(deadRsZnode, queue); + logZnodesMap.put(pair.getFirst(), pair.getSecond()); + } server.abort("Done with testing", null); } catch (Exception e) { LOG.error("Got exception while running NodeFailoverWorker", e);