http://git-wip-us.apache.org/repos/asf/hbase/blob/c85716fc/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 0460280..0214241 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -34,18 +34,21 @@ import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; @@ -59,7 +62,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; -import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -95,7 +98,7 @@ public class ReplicationSourceManager implements ReplicationListener { private final List<ReplicationSourceInterface> sources; // List of all the sources we got from died RSs private final List<ReplicationSourceInterface> oldsources; - private final ReplicationQueues replicationQueues; + private final ReplicationQueueStorage queueStorage; private final ReplicationTracker replicationTracker; private final ReplicationPeers replicationPeers; // UUID for this cluster @@ -130,7 +133,7 @@ public class ReplicationSourceManager implements ReplicationListener { /** * Creates a replication manager and sets the watch on all the other registered region servers - * @param replicationQueues the interface for manipulating replication queues + * @param queueStorage the interface for manipulating replication queues * @param replicationPeers * @param replicationTracker * @param conf the configuration to use @@ -140,14 +143,14 @@ public class ReplicationSourceManager implements ReplicationListener { * @param oldLogDir the directory where old logs are archived * @param clusterId */ - public ReplicationSourceManager(ReplicationQueues replicationQueues, + public ReplicationSourceManager(ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf, Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId, WALFileLengthProvider walFileLengthProvider) throws IOException { //CopyOnWriteArrayList is thread-safe. //Generally, reading is more than modifying. this.sources = new CopyOnWriteArrayList<>(); - this.replicationQueues = replicationQueues; + this.queueStorage = queueStorage; this.replicationPeers = replicationPeers; this.replicationTracker = replicationTracker; this.server = server; @@ -184,6 +187,19 @@ public class ReplicationSourceManager implements ReplicationListener { connection = ConnectionFactory.createConnection(conf); } + @FunctionalInterface + private interface ReplicationQueueOperation { + void exec() throws ReplicationException; + } + + private void abortWhenFail(ReplicationQueueOperation op) { + try { + op.exec(); + } catch (ReplicationException e) { + server.abort("Failed to operate on replication queue", e); + } + } + /** * Provide the id of the peer and a log key and this method will figure which * wal it belongs to and will log, for this region server, the current @@ -195,12 +211,13 @@ public class ReplicationSourceManager implements ReplicationListener { * @param queueRecovered indicates if this queue comes from another region server * @param holdLogInZK if true then the log is retained in ZK */ - public void logPositionAndCleanOldLogs(Path log, String id, long position, - boolean queueRecovered, boolean holdLogInZK) { + public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered, + boolean holdLogInZK) { String fileName = log.getName(); - this.replicationQueues.setLogPosition(id, fileName, position); + abortWhenFail( + () -> this.queueStorage.setWALPosition(server.getServerName(), id, fileName, position)); if (holdLogInZK) { - return; + return; } cleanOldLogs(fileName, id, queueRecovered); } @@ -227,36 +244,59 @@ public class ReplicationSourceManager implements ReplicationListener { } } } - } + } private void cleanOldLogs(SortedSet<String> wals, String key, String id) { SortedSet<String> walSet = wals.headSet(key); - LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet); + if (LOG.isDebugEnabled()) { + LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet); + } for (String wal : walSet) { - this.replicationQueues.removeLog(id, wal); + abortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal)); } walSet.clear(); } + private void adoptAbandonedQueues() { + List<ServerName> currentReplicators = null; + try { + currentReplicators = queueStorage.getListOfReplicators(); + } catch (ReplicationException e) { + server.abort("Failed to get all replicators", e); + return; + } + if (currentReplicators == null || currentReplicators.isEmpty()) { + return; + } + List<ServerName> otherRegionServers = replicationTracker.getListOfRegionServers().stream() + .map(ServerName::valueOf).collect(Collectors.toList()); + LOG.info( + "Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers); + + // Look if there's anything to process after a restart + for (ServerName rs : currentReplicators) { + if (!otherRegionServers.contains(rs)) { + transferQueues(rs); + } + } + } + /** - * Adds a normal source per registered peer cluster and tries to process all - * old region server wal queues + * Adds a normal source per registered peer cluster and tries to process all old region server wal + * queues + * <p> + * The returned future is for adoptAbandonedQueues task. */ - void init() throws IOException, ReplicationException { + Future<?> init() throws IOException, ReplicationException { for (String id : this.replicationPeers.getConnectedPeerIds()) { addSource(id); if (replicationForBulkLoadDataEnabled) { // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case // when a peer was added before replication for bulk loaded data was enabled. - this.replicationQueues.addPeerToHFileRefs(id); + this.queueStorage.addPeerToHFileRefs(id); } } - AdoptAbandonedQueuesWorker adoptionWorker = new AdoptAbandonedQueuesWorker(); - try { - this.executor.execute(adoptionWorker); - } catch (RejectedExecutionException ex) { - LOG.info("Cancelling the adoption of abandoned queues because of " + ex.getMessage()); - } + return this.executor.submit(this::adoptAbandonedQueues); } /** @@ -264,15 +304,12 @@ public class ReplicationSourceManager implements ReplicationListener { * need to enqueue the latest log of each wal group and do replication * @param id the id of the peer cluster * @return the source that was created - * @throws IOException */ @VisibleForTesting ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException { ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id); ReplicationPeer peer = replicationPeers.getConnectedPeer(id); - ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this, - this.replicationQueues, this.replicationPeers, server, id, this.clusterId, peerConfig, peer, - walFileLengthProvider); + ReplicationSourceInterface src = getReplicationSource(id, peerConfig, peer); synchronized (this.walsById) { this.sources.add(src); Map<String, SortedSet<String>> walsByGroup = new HashMap<>(); @@ -287,11 +324,10 @@ public class ReplicationSourceManager implements ReplicationListener { logs.add(name); walsByGroup.put(walPrefix, logs); try { - this.replicationQueues.addLog(id, name); + this.queueStorage.addWAL(server.getServerName(), id, name); } catch (ReplicationException e) { - String message = - "Cannot add log to queue when creating a new source, queueId=" + id - + ", filename=" + name; + String message = "Cannot add log to queue when creating a new source, queueId=" + id + + ", filename=" + name; server.stop(message); throw e; } @@ -316,7 +352,7 @@ public class ReplicationSourceManager implements ReplicationListener { * @param peerId Id of the peer cluster queue of wals to delete */ public void deleteSource(String peerId, boolean closeConnection) { - this.replicationQueues.removeQueue(peerId); + abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), peerId)); if (closeConnection) { this.replicationPeers.peerDisconnected(peerId); } @@ -376,8 +412,8 @@ public class ReplicationSourceManager implements ReplicationListener { } @VisibleForTesting - List<String> getAllQueues() { - return replicationQueues.getAllQueues(); + List<String> getAllQueues() throws ReplicationException { + return queueStorage.getAllQueues(server.getServerName()); } // public because of we call it in TestReplicationEmptyWALRecovery @@ -413,10 +449,10 @@ public class ReplicationSourceManager implements ReplicationListener { synchronized (replicationPeers) { for (String id : replicationPeers.getConnectedPeerIds()) { try { - this.replicationQueues.addLog(id, logName); + this.queueStorage.addWAL(server.getServerName(), id, logName); } catch (ReplicationException e) { - throw new IOException("Cannot add log to replication queue" - + " when creating a new source, queueId=" + id + ", filename=" + logName, e); + throw new IOException("Cannot add log to replication queue" + + " when creating a new source, queueId=" + id + ", filename=" + logName, e); } } } @@ -465,19 +501,11 @@ public class ReplicationSourceManager implements ReplicationListener { /** * Factory method to create a replication source - * @param conf the configuration to use - * @param fs the file system to use - * @param manager the manager to use - * @param server the server object for this region server * @param peerId the id of the peer cluster * @return the created source - * @throws IOException */ - private ReplicationSourceInterface getReplicationSource(Configuration conf, FileSystem fs, - ReplicationSourceManager manager, ReplicationQueues replicationQueues, - ReplicationPeers replicationPeers, Server server, String peerId, UUID clusterId, - ReplicationPeerConfig peerConfig, ReplicationPeer replicationPeer, - WALFileLengthProvider walFileLengthProvider) throws IOException { + private ReplicationSourceInterface getReplicationSource(String peerId, + ReplicationPeerConfig peerConfig, ReplicationPeer replicationPeer) throws IOException { RegionServerCoprocessorHost rsServerHost = null; TableDescriptors tableDescriptors = null; if (server instanceof HRegionServer) { @@ -494,9 +522,8 @@ public class ReplicationSourceManager implements ReplicationListener { // Default to HBase inter-cluster replication endpoint replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName(); } - @SuppressWarnings("rawtypes") - Class c = Class.forName(replicationEndpointImpl); - replicationEndpoint = (ReplicationEndpoint) c.newInstance(); + replicationEndpoint = Class.forName(replicationEndpointImpl) + .asSubclass(ReplicationEndpoint.class).newInstance(); if(rsServerHost != null) { ReplicationEndpoint newReplicationEndPoint = rsServerHost .postCreateReplicationEndPoint(replicationEndpoint); @@ -513,7 +540,7 @@ public class ReplicationSourceManager implements ReplicationListener { MetricsSource metrics = new MetricsSource(peerId); // init replication source - src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId, clusterId, + src.init(conf, fs, this, queueStorage, replicationPeers, server, peerId, clusterId, replicationEndpoint, walFileLengthProvider, metrics); // init replication endpoint @@ -524,21 +551,21 @@ public class ReplicationSourceManager implements ReplicationListener { } /** - * Transfer all the queues of the specified to this region server. - * First it tries to grab a lock and if it works it will move the - * znodes and finally will delete the old znodes. - * + * Transfer all the queues of the specified to this region server. First it tries to grab a lock + * and if it works it will move the znodes and finally will delete the old znodes. + * <p> * It creates one old source for any type of source of the old rs. - * @param rsZnode */ - private void transferQueues(String rsZnode) { - NodeFailoverWorker transfer = - new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers, - this.clusterId); + private void transferQueues(ServerName deadRS) { + if (server.getServerName().equals(deadRS)) { + // it's just us, give up + return; + } + NodeFailoverWorker transfer = new NodeFailoverWorker(deadRS); try { this.executor.execute(transfer); } catch (RejectedExecutionException ex) { - LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage()); + LOG.info("Cancelling the transfer of " + deadRS + " because of " + ex.getMessage()); } } @@ -575,7 +602,7 @@ public class ReplicationSourceManager implements ReplicationListener { LOG.info("Peer " + id + " connected success, trying to start the replication source thread."); addSource(id); if (replicationForBulkLoadDataEnabled) { - this.replicationQueues.addPeerToHFileRefs(id); + this.queueStorage.addPeerToHFileRefs(id); } } } @@ -628,12 +655,12 @@ public class ReplicationSourceManager implements ReplicationListener { deleteSource(id, true); } // Remove HFile Refs znode from zookeeper - this.replicationQueues.removePeerFromHFileRefs(id); + abortWhenFail(() -> this.queueStorage.removePeerFromHFileRefs(id)); } @Override public void regionServerRemoved(String regionserver) { - transferQueues(regionserver); + transferQueues(ServerName.valueOf(regionserver)); } /** @@ -642,37 +669,21 @@ public class ReplicationSourceManager implements ReplicationListener { */ class NodeFailoverWorker extends Thread { - private String rsZnode; - private final ReplicationQueues rq; - private final ReplicationPeers rp; - private final UUID clusterId; + private final ServerName deadRS; - /** - * @param rsZnode - */ - public NodeFailoverWorker(String rsZnode) { - this(rsZnode, replicationQueues, replicationPeers, ReplicationSourceManager.this.clusterId); - } - - public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues, - final ReplicationPeers replicationPeers, final UUID clusterId) { - super("Failover-for-"+rsZnode); - this.rsZnode = rsZnode; - this.rq = replicationQueues; - this.rp = replicationPeers; - this.clusterId = clusterId; + @VisibleForTesting + public NodeFailoverWorker(ServerName deadRS) { + super("Failover-for-" + deadRS); + this.deadRS = deadRS; } @Override public void run() { - if (this.rq.isThisOurRegionServer(rsZnode)) { - return; - } // Wait a bit before transferring the queues, we may be shutting down. // This sleep may not be enough in some cases. try { Thread.sleep(sleepBeforeFailover + - (long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover)); + (long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover)); } catch (InterruptedException e) { LOG.warn("Interrupted while waiting before transferring a queue."); Thread.currentThread().interrupt(); @@ -683,25 +694,30 @@ public class ReplicationSourceManager implements ReplicationListener { return; } Map<String, Set<String>> newQueues = new HashMap<>(); - List<String> peers = rq.getUnClaimedQueueIds(rsZnode); - while (peers != null && !peers.isEmpty()) { - Pair<String, SortedSet<String>> peer = this.rq.claimQueue(rsZnode, - peers.get(ThreadLocalRandom.current().nextInt(peers.size()))); - long sleep = sleepBeforeFailover/2; - if (peer != null) { - newQueues.put(peer.getFirst(), peer.getSecond()); - sleep = sleepBeforeFailover; + try { + List<String> peers = queueStorage.getAllQueues(deadRS); + while (!peers.isEmpty()) { + Pair<String, SortedSet<String>> peer = queueStorage.claimQueue(deadRS, + peers.get(ThreadLocalRandom.current().nextInt(peers.size())), server.getServerName()); + long sleep = sleepBeforeFailover / 2; + if (!peer.getSecond().isEmpty()) { + newQueues.put(peer.getFirst(), peer.getSecond()); + sleep = sleepBeforeFailover; + } + try { + Thread.sleep(sleep); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting before transferring a queue."); + Thread.currentThread().interrupt(); + } + peers = queueStorage.getAllQueues(deadRS); } - try { - Thread.sleep(sleep); - } catch (InterruptedException e) { - LOG.warn("Interrupted while waiting before transferring a queue."); - Thread.currentThread().interrupt(); + if (!peers.isEmpty()) { + queueStorage.removeReplicatorIfQueueIsEmpty(deadRS); } - peers = rq.getUnClaimedQueueIds(rsZnode); - } - if (peers != null) { - rq.removeReplicatorIfQueueIsEmpty(rsZnode); + } catch (ReplicationException e) { + server.abort("Failed to claim queue from dead regionserver", e); + return; } // Copying over the failed queue is completed. if (newQueues.isEmpty()) { @@ -726,8 +742,8 @@ public class ReplicationSourceManager implements ReplicationListener { + ex); } if (peer == null || peerConfig == null) { - LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode); - replicationQueues.removeQueue(peerId); + LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS); + abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId)); continue; } // track sources in walsByIdRecoveredQueues @@ -744,13 +760,11 @@ public class ReplicationSourceManager implements ReplicationListener { } // enqueue sources - ReplicationSourceInterface src = - getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp, - server, peerId, this.clusterId, peerConfig, peer, walFileLengthProvider); + ReplicationSourceInterface src = getReplicationSource(peerId, peerConfig, peer); // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer // see removePeer synchronized (oldsources) { - if (!this.rp.getConnectedPeerIds().contains(src.getPeerId())) { + if (!replicationPeers.getConnectedPeerIds().contains(src.getPeerId())) { src.terminate("Recovered queue doesn't belong to any current peer"); closeRecoveredQueue(src); continue; @@ -769,29 +783,6 @@ public class ReplicationSourceManager implements ReplicationListener { } } - class AdoptAbandonedQueuesWorker extends Thread{ - - public AdoptAbandonedQueuesWorker() {} - - @Override - public void run() { - List<String> currentReplicators = replicationQueues.getListOfReplicators(); - if (currentReplicators == null || currentReplicators.isEmpty()) { - return; - } - List<String> otherRegionServers = replicationTracker.getListOfRegionServers(); - LOG.info("Current list of replicators: " + currentReplicators + " other RSs: " - + otherRegionServers); - - // Look if there's anything to process after a restart - for (String rs : currentReplicators) { - if (!otherRegionServers.contains(rs)) { - transferQueues(rs); - } - } - } - } - /** * Get the directory where wals are archived * @return the directory where wals are archived @@ -850,7 +841,11 @@ public class ReplicationSourceManager implements ReplicationListener { } public void cleanUpHFileRefs(String peerId, List<String> files) { - this.replicationQueues.removeHFileRefs(peerId, files); + abortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files)); + } + + int activeFailoverTaskCount() { + return executor.getActiveCount(); } /**
http://git-wip-us.apache.org/repos/asf/hbase/blob/c85716fc/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java index a5cd9b2..01a230d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; @@ -37,22 +36,19 @@ import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** - * In a scenario of Replication based Disaster/Recovery, when hbase - * Master-Cluster crashes, this tool is used to sync-up the delta from Master to - * Slave using the info from ZooKeeper. The tool will run on Master-Cluser, and - * assume ZK, Filesystem and NetWork still available after hbase crashes + * In a scenario of Replication based Disaster/Recovery, when hbase Master-Cluster crashes, this + * tool is used to sync-up the delta from Master to Slave using the info from ZooKeeper. The tool + * will run on Master-Cluser, and assume ZK, Filesystem and NetWork still available after hbase + * crashes * + * <pre> * hbase org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp + * </pre> */ - public class ReplicationSyncUp extends Configured implements Tool { - private static final Logger LOG = LoggerFactory.getLogger(ReplicationSyncUp.class.getName()); - private static Configuration conf; private static final long SLEEP_TIME = 10000; @@ -106,13 +102,14 @@ public class ReplicationSyncUp extends Configured implements Tool { replication = new Replication(); replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir, null); manager = replication.getReplicationManager(); - manager.init(); + manager.init().get(); try { - int numberOfOldSource = 1; // default wait once - while (numberOfOldSource > 0) { + while (manager.activeFailoverTaskCount() > 0) { + Thread.sleep(SLEEP_TIME); + } + while (manager.getOldSources().size() > 0) { Thread.sleep(SLEEP_TIME); - numberOfOldSource = manager.getOldSources().size(); } } catch (InterruptedException e) { System.err.println("didn't wait long enough:" + e); @@ -122,7 +119,7 @@ public class ReplicationSyncUp extends Configured implements Tool { manager.join(); zkw.close(); - return (0); + return 0; } static class DummyServer implements Server { http://git-wip-us.apache.org/repos/asf/hbase/blob/c85716fc/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index 768bd83..2f518c7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -46,9 +46,8 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.replication.ReplicationFactory; -import org.apache.hadoop.hbase.replication.ReplicationQueues; -import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -117,9 +116,8 @@ public class TestLogsCleaner { HMaster.decorateMasterConfiguration(conf); Server server = new DummyServer(); - ReplicationQueues repQueues = ReplicationFactory.getReplicationQueues( - new ReplicationQueuesArguments(conf, server, server.getZooKeeper())); - repQueues.init(server.getServerName().toString()); + ReplicationQueueStorage queueStorage = + ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); final Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME); final Path oldProcedureWALDir = new Path(oldLogDir, "masterProcedureWALs"); String fakeMachineName = URLEncoder.encode(server.getServerName().toString(), "UTF8"); @@ -150,7 +148,7 @@ public class TestLogsCleaner { // Case 4: put 3 WALs in ZK indicating that they are scheduled for replication so these // files would pass TimeToLiveLogCleaner but would be rejected by ReplicationLogCleaner if (i % (30 / 3) == 1) { - repQueues.addLog(fakeMachineName, fileName.getName()); + queueStorage.addWAL(server.getServerName(), fakeMachineName, fileName.getName()); LOG.info("Replication log file: " + fileName); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c85716fc/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java index 102c8d9..e1eb822 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java @@ -48,8 +48,8 @@ import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeers; -import org.apache.hadoop.hbase.replication.ReplicationQueues; -import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -81,16 +81,13 @@ public class TestReplicationHFileCleaner { private static final Logger LOG = LoggerFactory.getLogger(TestReplicationHFileCleaner.class); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static Server server; - private static ReplicationQueues rq; + private static ReplicationQueueStorage rq; private static ReplicationPeers rp; private static final String peerId = "TestReplicationHFileCleaner"; private static Configuration conf = TEST_UTIL.getConfiguration(); static FileSystem fs = null; Path root; - /** - * @throws java.lang.Exception - */ @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.startMiniZKCluster(); @@ -99,20 +96,10 @@ public class TestReplicationHFileCleaner { HMaster.decorateMasterConfiguration(conf); rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf, server); rp.init(); - rq = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server, server.getZooKeeper())); - rq.init(server.getServerName().toString()); - try { - fs = FileSystem.get(conf); - } finally { - if (fs != null) { - fs.close(); - } - } + rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); + fs = FileSystem.get(conf); } - /** - * @throws java.lang.Exception - */ @AfterClass public static void tearDownAfterClass() throws Exception { TEST_UTIL.shutdownMiniZKCluster(); http://git-wip-us.apache.org/repos/asf/hbase/blob/c85716fc/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java index 22ed1a3..4bcde0a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java @@ -26,10 +26,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.replication.ReplicationFactory; -import org.apache.hadoop.hbase.replication.ReplicationQueues; -import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; -import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; @@ -55,14 +53,12 @@ public class TestReplicationZKNodeCleaner { private final Configuration conf; private final ZKWatcher zkw; - private final ReplicationQueues repQueues; + private final ReplicationQueueStorage repQueues; public TestReplicationZKNodeCleaner() throws Exception { conf = TEST_UTIL.getConfiguration(); zkw = new ZKWatcher(conf, "TestReplicationZKNodeCleaner", null); - repQueues = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, null, - zkw)); - assertTrue(repQueues instanceof ReplicationQueuesZKImpl); + repQueues = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf); } @BeforeClass @@ -78,9 +74,8 @@ public class TestReplicationZKNodeCleaner { @Test public void testReplicationZKNodeCleaner() throws Exception { - repQueues.init(SERVER_ONE.getServerName()); // add queue for ID_ONE which isn't exist - repQueues.addLog(ID_ONE, "file1"); + repQueues.addWAL(SERVER_ONE, ID_ONE, "file1"); ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, null); Map<ServerName, List<String>> undeletedQueues = cleaner.getUnDeletedQueues(); @@ -90,7 +85,7 @@ public class TestReplicationZKNodeCleaner { assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_ONE)); // add a recovery queue for ID_TWO which isn't exist - repQueues.addLog(ID_TWO + "-" + SERVER_TWO, "file2"); + repQueues.addWAL(SERVER_ONE, ID_TWO + "-" + SERVER_TWO, "file2"); undeletedQueues = cleaner.getUnDeletedQueues(); assertEquals(1, undeletedQueues.size()); @@ -106,11 +101,10 @@ public class TestReplicationZKNodeCleaner { @Test public void testReplicationZKNodeCleanerChore() throws Exception { - repQueues.init(SERVER_ONE.getServerName()); // add queue for ID_ONE which isn't exist - repQueues.addLog(ID_ONE, "file1"); + repQueues.addWAL(SERVER_ONE, ID_ONE, "file1"); // add a recovery queue for ID_TWO which isn't exist - repQueues.addLog(ID_TWO + "-" + SERVER_TWO, "file2"); + repQueues.addWAL(SERVER_ONE, ID_TWO + "-" + SERVER_TWO, "file2"); // Wait the cleaner chore to run Thread.sleep(20000); http://git-wip-us.apache.org/repos/asf/hbase/blob/c85716fc/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index 7ea79f9..14c5e56 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -46,9 +45,10 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { MetricsSource metrics; WALFileLengthProvider walFileLengthProvider; AtomicBoolean startup = new AtomicBoolean(false); + @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, - ReplicationQueues rq, ReplicationPeers rp, Server server, String peerClusterId, + ReplicationQueueStorage rq, ReplicationPeers rp, Server server, String peerClusterId, UUID clusterId, ReplicationEndpoint replicationEndpoint, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { this.manager = manager; http://git-wip-us.apache.org/repos/asf/hbase/blob/c85716fc/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java index 1672390..19aeac1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -69,7 +69,6 @@ public class TestReplicationSyncUpTool extends TestReplicationBase { @Before public void setUp() throws Exception { - HColumnDescriptor fam; t1_syncupSource = new HTableDescriptor(t1_su); @@ -105,7 +104,7 @@ public class TestReplicationSyncUpTool extends TestReplicationBase { * check's gone Also check the puts and deletes are not replicated back to * the originating cluster. */ - @Test(timeout = 300000) + @Test public void testSyncUpTool() throws Exception { /** @@ -181,7 +180,6 @@ public class TestReplicationSyncUpTool extends TestReplicationBase { * verify correctly replicated to Slave */ mimicSyncUpAfterPut(); - } protected void setupReplication() throws Exception { http://git-wip-us.apache.org/repos/asf/hbase/blob/c85716fc/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index d1a9f83..1d3b6ea 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -68,10 +68,10 @@ import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeers; -import org.apache.hadoop.hbase.replication.ReplicationQueues; -import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; @@ -338,18 +338,14 @@ public abstract class TestReplicationSourceManager { @Test public void testClaimQueues() throws Exception { - final Server server = new DummyServer("hostname0.example.org"); - - - ReplicationQueues rq = - ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server, - server.getZooKeeper())); - rq.init(server.getServerName().toString()); + Server server = new DummyServer("hostname0.example.org"); + ReplicationQueueStorage rq = ReplicationStorageFactory + .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); // populate some znodes in the peer znode files.add("log1"); files.add("log2"); for (String file : files) { - rq.addLog("1", file); + rq.addWAL(server.getServerName(), "1", file); } // create 3 DummyServers Server s1 = new DummyServer("dummyserver1.example.org"); @@ -357,12 +353,9 @@ public abstract class TestReplicationSourceManager { Server s3 = new DummyServer("dummyserver3.example.org"); // create 3 DummyNodeFailoverWorkers - DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker( - server.getServerName().getServerName(), s1); - DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker( - server.getServerName().getServerName(), s2); - DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker( - server.getServerName().getServerName(), s3); + DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(server.getServerName(), s1); + DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker(server.getServerName(), s2); + DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker(server.getServerName(), s3); latch = new CountDownLatch(3); // start the threads @@ -381,11 +374,9 @@ public abstract class TestReplicationSourceManager { @Test public void testCleanupFailoverQueues() throws Exception { - final Server server = new DummyServer("hostname1.example.org"); - ReplicationQueues rq = - ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server, - server.getZooKeeper())); - rq.init(server.getServerName().toString()); + Server server = new DummyServer("hostname1.example.org"); + ReplicationQueueStorage rq = ReplicationStorageFactory + .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); // populate some znodes in the peer znode SortedSet<String> files = new TreeSet<>(); String group = "testgroup"; @@ -394,19 +385,14 @@ public abstract class TestReplicationSourceManager { files.add(file1); files.add(file2); for (String file : files) { - rq.addLog("1", file); + rq.addWAL(server.getServerName(), "1", file); } Server s1 = new DummyServer("dummyserver1.example.org"); - ReplicationQueues rq1 = - ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1, - s1.getZooKeeper())); - rq1.init(s1.getServerName().toString()); ReplicationPeers rp1 = ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1); rp1.init(); NodeFailoverWorker w1 = - manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID( - new Long(1), new Long(2))); + manager.new NodeFailoverWorker(server.getServerName()); w1.run(); assertEquals(1, manager.getWalsByIdRecoveredQueues().size()); String id = "1-" + server.getServerName().getServerName(); @@ -418,17 +404,16 @@ public abstract class TestReplicationSourceManager { @Test public void testCleanupUnknownPeerZNode() throws Exception { - final Server server = new DummyServer("hostname2.example.org"); - ReplicationQueues rq = ReplicationFactory.getReplicationQueues( - new ReplicationQueuesArguments(server.getConfiguration(), server, server.getZooKeeper())); - rq.init(server.getServerName().toString()); + Server server = new DummyServer("hostname2.example.org"); + ReplicationQueueStorage rq = ReplicationStorageFactory + .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); // populate some znodes in the peer znode // add log to an unknown peer String group = "testgroup"; - rq.addLog("2", group + ".log1"); - rq.addLog("2", group + ".log2"); + rq.addWAL(server.getServerName(), "2", group + ".log1"); + rq.addWAL(server.getServerName(), "2", group + ".log2"); - NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName().getServerName()); + NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName()); w1.run(); // The log of the unknown peer should be removed from zk @@ -506,10 +491,8 @@ public abstract class TestReplicationSourceManager { .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase"); try { DummyServer server = new DummyServer(); - final ReplicationQueues rq = - ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments( - server.getConfiguration(), server, server.getZooKeeper())); - rq.init(server.getServerName().toString()); + ReplicationQueueStorage rq = ReplicationStorageFactory + .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); // Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface // initialization to throw an exception. conf.set("replication.replicationsource.implementation", @@ -523,11 +506,11 @@ public abstract class TestReplicationSourceManager { assertNull(manager.getSource(peerId)); // Create a replication queue for the fake peer - rq.addLog(peerId, "FakeFile"); + rq.addWAL(server.getServerName(), peerId, "FakeFile"); // Unregister peer, this should remove the peer and clear all queues associated with it // Need to wait for the ReplicationTracker to pick up the changes and notify listeners. removePeerAndWait(peerId); - assertFalse(rq.getAllQueues().contains(peerId)); + assertFalse(rq.getAllQueues(server.getServerName()).contains(peerId)); } finally { conf.set("replication.replicationsource.implementation", replicationSourceImplName); removePeerAndWait(peerId); @@ -650,11 +633,12 @@ public abstract class TestReplicationSourceManager { } } Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { - @Override public boolean evaluate() throws Exception { + @Override + public boolean evaluate() throws Exception { List<String> peers = rp.getAllPeerIds(); - return (!manager.getAllQueues().contains(peerId)) && (rp.getConnectedPeer(peerId) == null) - && (!peers.contains(peerId)) - && manager.getSource(peerId) == null; + return (!manager.getAllQueues().contains(peerId)) && + (rp.getConnectedPeer(peerId) == null) && (!peers.contains(peerId)) && + manager.getSource(peerId) == null; } }); } @@ -697,25 +681,24 @@ public abstract class TestReplicationSourceManager { static class DummyNodeFailoverWorker extends Thread { private Map<String, Set<String>> logZnodesMap; Server server; - private String deadRsZnode; - ReplicationQueues rq; + private ServerName deadRS; + ReplicationQueueStorage rq; - public DummyNodeFailoverWorker(String znode, Server s) throws Exception { - this.deadRsZnode = znode; + public DummyNodeFailoverWorker(ServerName deadRS, Server s) throws Exception { + this.deadRS = deadRS; this.server = s; - this.rq = - ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server, - server.getZooKeeper())); - this.rq.init(this.server.getServerName().toString()); + this.rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), + server.getConfiguration()); } @Override public void run() { try { logZnodesMap = new HashMap<>(); - List<String> queues = rq.getUnClaimedQueueIds(deadRsZnode); - for(String queue:queues){ - Pair<String, SortedSet<String>> pair = rq.claimQueue(deadRsZnode, queue); + List<String> queues = rq.getAllQueues(deadRS); + for (String queue : queues) { + Pair<String, SortedSet<String>> pair = + rq.claimQueue(deadRS, queue, server.getServerName()); if (pair != null) { logZnodesMap.put(pair.getFirst(), pair.getSecond()); } @@ -754,7 +737,7 @@ public abstract class TestReplicationSourceManager { @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, - ReplicationQueues rq, ReplicationPeers rp, Server server, String peerClusterId, + ReplicationQueueStorage rq, ReplicationPeers rp, Server server, String peerClusterId, UUID clusterId, ReplicationEndpoint replicationEndpoint, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { throw new IOException("Failing deliberately"); http://git-wip-us.apache.org/repos/asf/hbase/blob/c85716fc/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java index 4d81d1d..8e0ab0f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java @@ -25,11 +25,10 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; -import org.apache.hadoop.hbase.replication.ReplicationQueues; -import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.junit.BeforeClass; @@ -42,7 +41,7 @@ import org.junit.experimental.categories.Category; * ReplicationQueuesClientZkImpl. Also includes extra tests outside of those in * TestReplicationSourceManager that test ReplicationQueueZkImpl-specific behaviors. */ -@Category({ReplicationTests.class, MediumTests.class}) +@Category({ ReplicationTests.class, MediumTests.class }) public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceManager { @ClassRule @@ -64,16 +63,14 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan // Tests the naming convention of adopted queues for ReplicationQueuesZkImpl @Test public void testNodeFailoverDeadServerParsing() throws Exception { - final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com"); - ReplicationQueues repQueues = - ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server, - server.getZooKeeper())); - repQueues.init(server.getServerName().toString()); + Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com"); + ReplicationQueueStorage queueStorage = + ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); // populate some znodes in the peer znode files.add("log1"); files.add("log2"); for (String file : files) { - repQueues.addLog("1", file); + queueStorage.addWAL(server.getServerName(), "1", file); } // create 3 DummyServers @@ -82,30 +79,22 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com"); // simulate three servers fail sequentially - ReplicationQueues rq1 = - ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1, - s1.getZooKeeper())); - rq1.init(s1.getServerName().toString()); - String serverName = server.getServerName().getServerName(); - List<String> unclaimed = rq1.getUnClaimedQueueIds(serverName); - rq1.claimQueue(serverName, unclaimed.get(0)).getSecond(); - rq1.removeReplicatorIfQueueIsEmpty(unclaimed.get(0)); - ReplicationQueues rq2 = - ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s2.getConfiguration(), s2, - s2.getZooKeeper())); - rq2.init(s2.getServerName().toString()); - serverName = s1.getServerName().getServerName(); - unclaimed = rq2.getUnClaimedQueueIds(serverName); - rq2.claimQueue(serverName, unclaimed.get(0)).getSecond(); - rq2.removeReplicatorIfQueueIsEmpty(unclaimed.get(0)); - ReplicationQueues rq3 = - ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s3.getConfiguration(), s3, - s3.getZooKeeper())); - rq3.init(s3.getServerName().toString()); - serverName = s2.getServerName().getServerName(); - unclaimed = rq3.getUnClaimedQueueIds(serverName); - String queue3 = rq3.claimQueue(serverName, unclaimed.get(0)).getFirst(); - rq3.removeReplicatorIfQueueIsEmpty(unclaimed.get(0)); + ServerName serverName = server.getServerName(); + List<String> unclaimed = queueStorage.getAllQueues(serverName); + queueStorage.claimQueue(serverName, unclaimed.get(0), s1.getServerName()); + queueStorage.removeReplicatorIfQueueIsEmpty(serverName); + + serverName = s1.getServerName(); + unclaimed = queueStorage.getAllQueues(serverName); + queueStorage.claimQueue(serverName, unclaimed.get(0), s2.getServerName()); + queueStorage.removeReplicatorIfQueueIsEmpty(serverName); + + serverName = s2.getServerName(); + unclaimed = queueStorage.getAllQueues(serverName); + String queue3 = + queueStorage.claimQueue(serverName, unclaimed.get(0), s3.getServerName()).getFirst(); + queueStorage.removeReplicatorIfQueueIsEmpty(serverName); + ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queue3); List<ServerName> result = replicationQueueInfo.getDeadRegionServers(); // verify