HBASE-19636 All rs should already start work with the new peer change when replication peer procedure is finished
Signed-off-by: zhangduo <zhang...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d36aacdf Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d36aacdf Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d36aacdf Branch: refs/heads/branch-2 Commit: d36aacdf9ea856fbe981073ac079181143e81c09 Parents: 53b18fe Author: Guanghao Zhang <zg...@apache.org> Authored: Thu Jan 4 16:58:01 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Fri Mar 9 20:55:48 2018 +0800 ---------------------------------------------------------------------- .../replication/ReplicationPeerConfig.java | 1 - .../hbase/replication/ReplicationPeerImpl.java | 4 +- .../hbase/replication/ReplicationQueueInfo.java | 23 +- .../hbase/replication/ReplicationUtils.java | 56 ++ .../replication/TestReplicationStateZKImpl.java | 21 - .../regionserver/ReplicationSourceService.java | 3 +- .../regionserver/PeerProcedureHandler.java | 3 + .../regionserver/PeerProcedureHandlerImpl.java | 50 +- .../RecoveredReplicationSource.java | 6 +- .../RecoveredReplicationSourceShipper.java | 8 +- .../replication/regionserver/Replication.java | 11 +- .../regionserver/ReplicationSource.java | 34 +- .../regionserver/ReplicationSourceFactory.java | 4 +- .../ReplicationSourceInterface.java | 8 +- .../regionserver/ReplicationSourceManager.java | 827 ++++++++++--------- .../regionserver/ReplicationSourceShipper.java | 6 +- .../ReplicationSourceWALReader.java | 2 +- .../replication/ReplicationSourceDummy.java | 2 +- .../replication/TestNamespaceReplication.java | 57 +- .../TestReplicationSourceManager.java | 5 +- 20 files changed, 622 insertions(+), 509 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/d36aacdf/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java index fdae288..bf8d030 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; - import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; http://git-wip-us.apache.org/repos/asf/hbase/blob/d36aacdf/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java index 3e17025..604e0bb 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -28,6 +27,7 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public class ReplicationPeerImpl implements ReplicationPeer { + private final Configuration conf; private final String id; http://git-wip-us.apache.org/repos/asf/hbase/blob/d36aacdf/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java index ecd888f..cd65f9b 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java @@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.ServerName; /** - * This class is responsible for the parsing logic for a znode representing a queue. + * This class is responsible for the parsing logic for a queue id representing a queue. * It will extract the peerId if it's recovered as well as the dead region servers * that were part of the queue's history. */ @@ -38,21 +38,20 @@ public class ReplicationQueueInfo { private static final Logger LOG = LoggerFactory.getLogger(ReplicationQueueInfo.class); private final String peerId; - private final String peerClusterZnode; + private final String queueId; private boolean queueRecovered; // List of all the dead region servers that had this queue (if recovered) private List<ServerName> deadRegionServers = new ArrayList<>(); /** - * The passed znode will be either the id of the peer cluster or - * the handling story of that queue in the form of id-servername-* + * The passed queueId will be either the id of the peer or the handling story of that queue + * in the form of id-servername-* */ - public ReplicationQueueInfo(String znode) { - this.peerClusterZnode = znode; - String[] parts = znode.split("-", 2); + public ReplicationQueueInfo(String queueId) { + this.queueId = queueId; + String[] parts = queueId.split("-", 2); this.queueRecovered = parts.length != 1; - this.peerId = this.queueRecovered ? - parts[0] : peerClusterZnode; + this.peerId = this.queueRecovered ? parts[0] : queueId; if (parts.length >= 2) { // extract dead servers extractDeadServersFromZNodeString(parts[1], this.deadRegionServers); @@ -60,7 +59,7 @@ public class ReplicationQueueInfo { } /** - * Parse dead server names from znode string servername can contain "-" such as + * Parse dead server names from queue id. servername can contain "-" such as * "ip-10-46-221-101.ec2.internal", so we need skip some "-" during parsing for the following * cases: 2-ip-10-46-221-101.ec2.internal,52170,1364333181125-<server name>-... */ @@ -119,8 +118,8 @@ public class ReplicationQueueInfo { return this.peerId; } - public String getPeerClusterZnode() { - return this.peerClusterZnode; + public String getQueueId() { + return this.queueId; } public boolean isQueueRecovered() { http://git-wip-us.apache.org/repos/asf/hbase/blob/d36aacdf/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java index be70e6e..ca871ea 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java @@ -18,12 +18,16 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; +import java.util.Collection; import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; /** @@ -76,4 +80,56 @@ public final class ReplicationUtils { queueStorage.removeReplicatorIfQueueIsEmpty(replicator); } } + + private static boolean isCollectionEqual(Collection<String> c1, Collection<String> c2) { + if (c1 == null) { + return c2 == null; + } + if (c2 == null) { + return false; + } + return c1.size() == c2.size() && c1.containsAll(c2); + } + + private static boolean isNamespacesEqual(Set<String> ns1, Set<String> ns2) { + return isCollectionEqual(ns1, ns2); + } + + private static boolean isTableCFsEqual(Map<TableName, List<String>> tableCFs1, + Map<TableName, List<String>> tableCFs2) { + if (tableCFs1 == null) { + return tableCFs2 == null; + } + if (tableCFs2 == null) { + return false; + } + if (tableCFs1.size() != tableCFs2.size()) { + return false; + } + for (Map.Entry<TableName, List<String>> entry1 : tableCFs1.entrySet()) { + TableName table = entry1.getKey(); + if (!tableCFs2.containsKey(table)) { + return false; + } + List<String> cfs1 = entry1.getValue(); + List<String> cfs2 = tableCFs2.get(table); + if (!isCollectionEqual(cfs1, cfs2)) { + return false; + } + } + return true; + } + + public static boolean isKeyConfigEqual(ReplicationPeerConfig rpc1, ReplicationPeerConfig rpc2) { + if (rpc1.replicateAllUserTables() != rpc2.replicateAllUserTables()) { + return false; + } + if (rpc1.replicateAllUserTables()) { + return isNamespacesEqual(rpc1.getExcludeNamespaces(), rpc2.getExcludeNamespaces()) && + isTableCFsEqual(rpc1.getExcludeTableCFsMap(), rpc2.getExcludeTableCFsMap()); + } else { + return isNamespacesEqual(rpc1.getNamespaces(), rpc2.getNamespaces()) && + isTableCFsEqual(rpc1.getTableCFsMap(), rpc2.getTableCFsMap()); + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/d36aacdf/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java index 1830103..08178f4 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.ClusterId; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseZKTestingUtility; @@ -38,8 +37,6 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.experimental.categories.Category; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @Category({ ReplicationTests.class, MediumTests.class }) public class TestReplicationStateZKImpl extends TestReplicationStateBasic { @@ -48,8 +45,6 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestReplicationStateZKImpl.class); - private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateZKImpl.class); - private static Configuration conf; private static HBaseZKTestingUtility utility; private static ZKWatcher zkw; @@ -97,20 +92,4 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic { public static void tearDownAfterClass() throws Exception { utility.shutdownMiniZKCluster(); } - - private static class WarnOnlyAbortable implements Abortable { - - @Override - public void abort(String why, Throwable e) { - LOG.warn("TestReplicationStateZKImpl received abort, ignoring. Reason: " + why); - if (LOG.isDebugEnabled()) { - LOG.debug(e.toString(), e); - } - } - - @Override - public boolean isAborted() { - return false; - } - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/d36aacdf/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java index 977dd15..23ba773 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information http://git-wip-us.apache.org/repos/asf/hbase/blob/d36aacdf/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java index b392985..65da9af 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java @@ -23,6 +23,9 @@ import java.io.IOException; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.yetus.audience.InterfaceAudience; +/** + * A handler for modifying replication peer in peer procedures. + */ @InterfaceAudience.Private public interface PeerProcedureHandler { http://git-wip-us.apache.org/repos/asf/hbase/blob/d36aacdf/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java index c09c6a0..ce8fdae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java @@ -15,21 +15,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.concurrent.locks.Lock; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; +import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.util.KeyLocker; import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @InterfaceAudience.Private public class PeerProcedureHandlerImpl implements PeerProcedureHandler { - private static final Logger LOG = LoggerFactory.getLogger(PeerProcedureHandlerImpl.class); private final ReplicationSourceManager replicationSourceManager; private final KeyLocker<String> peersLock = new KeyLocker<>(); @@ -39,7 +38,7 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler { } @Override - public void addPeer(String peerId) throws ReplicationException, IOException { + public void addPeer(String peerId) throws IOException { Lock peerLock = peersLock.acquireLock(peerId); try { replicationSourceManager.addPeer(peerId); @@ -49,7 +48,7 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler { } @Override - public void removePeer(String peerId) throws ReplicationException, IOException { + public void removePeer(String peerId) throws IOException { Lock peerLock = peersLock.acquireLock(peerId); try { if (replicationSourceManager.getReplicationPeers().getPeer(peerId) != null) { @@ -60,35 +59,50 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler { } } - @Override - public void disablePeer(String peerId) throws ReplicationException, IOException { + private void refreshPeerState(String peerId) throws ReplicationException, IOException { PeerState newState; Lock peerLock = peersLock.acquireLock(peerId); try { + ReplicationPeerImpl peer = replicationSourceManager.getReplicationPeers().getPeer(peerId); + if (peer == null) { + throw new ReplicationException("Peer with id=" + peerId + " is not cached."); + } + PeerState oldState = peer.getPeerState(); newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId); + // RS need to start work with the new replication state change + if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) { + replicationSourceManager.refreshSources(peerId); + } } finally { peerLock.unlock(); } - LOG.info("disable replication peer, id: {}, new state: {}", peerId, newState); } @Override public void enablePeer(String peerId) throws ReplicationException, IOException { - PeerState newState; - Lock peerLock = peersLock.acquireLock(peerId); - try { - newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId); - } finally { - peerLock.unlock(); - } - LOG.info("enable replication peer, id: {}, new state: {}", peerId, newState); + refreshPeerState(peerId); + } + + @Override + public void disablePeer(String peerId) throws ReplicationException, IOException { + refreshPeerState(peerId); } @Override public void updatePeerConfig(String peerId) throws ReplicationException, IOException { Lock peerLock = peersLock.acquireLock(peerId); try { - replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId); + ReplicationPeerImpl peer = replicationSourceManager.getReplicationPeers().getPeer(peerId); + if (peer == null) { + throw new ReplicationException("Peer with id=" + peerId + " is not cached."); + } + ReplicationPeerConfig oldConfig = peer.getPeerConfig(); + ReplicationPeerConfig newConfig = + replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId); + // RS need to start work with the new replication config change + if (!ReplicationUtils.isKeyConfigEqual(oldConfig, newConfig)) { + replicationSourceManager.refreshSources(peerId); + } } finally { peerLock.unlock(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/d36aacdf/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index 7bceb78..1be9a88 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -81,7 +81,7 @@ public class RecoveredReplicationSource extends ReplicationSource { ReplicationSourceWALReader walReader = new RecoveredReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); Threads.setDaemonThreadRunning(walReader, threadName - + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode, + + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + queueId, getUncaughtExceptionHandler()); return walReader; } @@ -178,8 +178,8 @@ public class RecoveredReplicationSource extends ReplicationSource { } } if (allTasksDone) { - manager.closeRecoveredQueue(this); - LOG.info("Finished recovering queue " + peerClusterZnode + " with the following stats: " + manager.removeRecoveredSource(this); + LOG.info("Finished recovering queue " + queueId + " with the following stats: " + getStats()); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/d36aacdf/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java index 1c0bbee..38bbb48 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java @@ -76,7 +76,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper shipEdits(entryBatch); if (entryBatch.getWalEntries().isEmpty()) { LOG.debug("Finished recovering queue for group " + walGroupId + " of peer " - + source.getPeerClusterZnode()); + + source.getQueueId()); source.getSourceMetrics().incrCompletedRecoveryQueue(); setWorkerState(WorkerState.FINISHED); continue; @@ -113,7 +113,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper // normally has a position (unless the RS failed between 2 logs) private long getRecoveredQueueStartPos() { long startPosition = 0; - String peerClusterZnode = source.getPeerClusterZnode(); + String peerClusterZnode = source.getQueueId(); try { startPosition = this.replicationQueues.getWALPosition(source.getServerWALsBelongTo(), peerClusterZnode, this.queue.peek().getName()); @@ -129,8 +129,8 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper @Override protected void updateLogPosition(long lastReadPosition) { - source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getPeerClusterZnode(), - lastReadPosition, true, false); + source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getQueueId(), + lastReadPosition, true); lastLoggedPosition = lastReadPosition; } http://git-wip-us.apache.org/repos/asf/hbase/blob/d36aacdf/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 5f8d0aa..6c46a85 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; -import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; @@ -187,11 +186,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer */ @Override public void startReplicationService() throws IOException { - try { - this.replicationManager.init(); - } catch (ReplicationException e) { - throw new IOException(e); - } + this.replicationManager.init(); this.replicationSink = new ReplicationSink(this.conf, this.server); this.scheduleThreadPool.scheduleAtFixedRate( new ReplicationStatisticsTask(this.replicationSink, this.replicationManager), @@ -210,9 +205,9 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer throws IOException { try { this.replicationManager.addHFileRefs(tableName, family, pairs); - } catch (ReplicationException e) { + } catch (IOException e) { LOG.error("Failed to add hfile references in the replication queue.", e); - throw new IOException(e); + throw e; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/d36aacdf/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index ffed88d..0092251 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -105,7 +105,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf // total number of edits we replicated private AtomicLong totalReplicatedEdits = new AtomicLong(0); // The znode we currently play with - protected String peerClusterZnode; + protected String queueId; // Maximum number of retries before taking bold actions private int maxRetriesMultiplier; // Indicates if this particular source is running @@ -141,14 +141,14 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf * @param fs file system to use * @param manager replication manager to ping to * @param server the server for this region server - * @param peerClusterZnode the name of our znode + * @param queueId the id of our replication queue * @param clusterId unique UUID for the cluster * @param metrics metrics for replication source */ @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, - String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider, + String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { this.server = server; this.conf = HBaseConfiguration.create(conf); @@ -167,8 +167,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf this.metrics = metrics; this.clusterId = clusterId; - this.peerClusterZnode = peerClusterZnode; - this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode); + this.queueId = queueId; + this.replicationQueueInfo = new ReplicationQueueInfo(queueId); // ReplicationQueueInfo parses the peerId out of the znode for us this.peerId = this.replicationQueueInfo.getPeerId(); this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); @@ -178,7 +178,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); this.totalBufferUsed = manager.getTotalBufferUsed(); this.walFileLengthProvider = walFileLengthProvider; - LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId + LOG.info("queueId=" + queueId + ", ReplicationSource : " + peerId + ", currentBandwidth=" + this.currentBandwidth); } @@ -216,12 +216,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf @Override public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) throws ReplicationException { - String peerId = peerClusterZnode; - if (peerId.contains("-")) { - // peerClusterZnode will be in the form peerId + "-" + rsZNode. - // A peerId will not have "-" in its name, see HBASE-11394 - peerId = peerClusterZnode.split("-")[0]; - } Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs(); if (tableCFMap != null) { List<String> tableCfs = tableCFMap.get(tableName); @@ -310,7 +304,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId " + peerClusterId + " which is not allowed by ReplicationEndpoint:" + replicationEndpoint.getClass().getName(), null, false); - this.manager.closeQueue(this); + this.manager.removeSource(this); return; } LOG.info("Replicating " + clusterId + " -> " + peerClusterId); @@ -355,7 +349,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf ReplicationSourceWALReader walReader = new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); return (ReplicationSourceWALReader) Threads.setDaemonThreadRunning(walReader, - threadName + ".replicationSource.wal-reader." + walGroupId + "," + peerClusterZnode, + threadName + ".replicationSource.wal-reader." + walGroupId + "," + queueId, getUncaughtExceptionHandler()); } @@ -449,7 +443,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf LOG.error("Unexpected exception in ReplicationSource", e); } }; - Threads.setDaemonThreadRunning(this, n + ".replicationSource," + this.peerClusterZnode, + Threads.setDaemonThreadRunning(this, n + ".replicationSource," + this.queueId, handler); } @@ -465,9 +459,9 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf public void terminate(String reason, Exception cause, boolean join) { if (cause == null) { - LOG.info("Closing source " + this.peerClusterZnode + " because: " + reason); + LOG.info("Closing source " + this.queueId + " because: " + reason); } else { - LOG.error("Closing source " + this.peerClusterZnode + " because an error occurred: " + reason, + LOG.error("Closing source " + this.queueId + " because an error occurred: " + reason, cause); } this.sourceRunning = false; @@ -491,7 +485,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf .awaitTerminated(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS); } catch (TimeoutException te) { LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :" - + this.peerClusterZnode, + + this.queueId, te); } } @@ -499,8 +493,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf } @Override - public String getPeerClusterZnode() { - return this.peerClusterZnode; + public String getQueueId() { + return this.queueId; } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/d36aacdf/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java index 865a202..93e8331 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java @@ -32,8 +32,8 @@ public class ReplicationSourceFactory { private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceFactory.class); - static ReplicationSourceInterface create(Configuration conf, String peerId) { - ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); + static ReplicationSourceInterface create(Configuration conf, String queueId) { + ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId); boolean isQueueRecovered = replicationQueueInfo.isQueueRecovered(); ReplicationSourceInterface src; try { http://git-wip-us.apache.org/repos/asf/hbase/blob/d36aacdf/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 4f10c73..d7cf9a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -51,7 +51,7 @@ public interface ReplicationSourceInterface { */ void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, - String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider, + String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException; /** @@ -96,11 +96,11 @@ public interface ReplicationSourceInterface { Path getCurrentPath(); /** - * Get the id that the source is replicating to + * Get the queue id that the source is replicating to * - * @return peer cluster id + * @return queue id */ - String getPeerClusterZnode(); + String getQueueId(); /** * Get the id that the source is replicating to. http://git-wip-us.apache.org/repos/asf/hbase/blob/d36aacdf/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 968b3fb..6965f55 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; @@ -32,7 +30,7 @@ import java.util.SortedSet; import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; @@ -66,27 +64,53 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; /** - * This class is responsible to manage all the replication - * sources. There are two classes of sources: + * This class is responsible to manage all the replication sources. There are two classes of + * sources: * <ul> - * <li> Normal sources are persistent and one per peer cluster</li> - * <li> Old sources are recovered from a failed region server and our - * only goal is to finish replicating the WAL queue it had up in ZK</li> + * <li>Normal sources are persistent and one per peer cluster</li> + * <li>Old sources are recovered from a failed region server and our only goal is to finish + * replicating the WAL queue it had</li> + * </ul> + * <p> + * When a region server dies, this class uses a watcher to get notified and it tries to grab a lock + * in order to transfer all the queues in a local old source. + * <p> + * Synchronization specification: + * <ul> + * <li>No need synchronized on {@link #sources}. {@link #sources} is a ConcurrentHashMap and there + * is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So there is no race for peer + * operations.</li> + * <li>Need synchronized on {@link #walsById}. There are four methods which modify it, + * {@link #addPeer(String)}, {@link #removePeer(String)}, + * {@link #cleanOldLogs(SortedSet, String, String)} and {@link #preLogRoll(Path)}. {@link #walsById} + * is a ConcurrentHashMap and there is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So + * there is no race between {@link #addPeer(String)} and {@link #removePeer(String)}. + * {@link #cleanOldLogs(SortedSet, String, String)} is called by {@link ReplicationSourceInterface}. + * So no race with {@link #addPeer(String)}. {@link #removePeer(String)} will terminate the + * {@link ReplicationSourceInterface} firstly, then remove the wals from {@link #walsById}. So no + * race with {@link #removePeer(String)}. The only case need synchronized is + * {@link #cleanOldLogs(SortedSet, String, String)} and {@link #preLogRoll(Path)}.</li> + * <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which + * modify it, {@link #removePeer(String)} , {@link #cleanOldLogs(SortedSet, String, String)} and + * {@link ReplicationSourceManager.NodeFailoverWorker#run()}. + * {@link #cleanOldLogs(SortedSet, String, String)} is called by {@link ReplicationSourceInterface}. + * {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then + * remove the wals from {@link #walsByIdRecoveredQueues}. And + * {@link ReplicationSourceManager.NodeFailoverWorker#run()} will add the wals to + * {@link #walsByIdRecoveredQueues} firstly, then start up a {@link ReplicationSourceInterface}. So + * there is no race here. For {@link ReplicationSourceManager.NodeFailoverWorker#run()} and + * {@link #removePeer(String)}, there is already synchronized on {@link #oldsources}. So no need + * synchronized on {@link #walsByIdRecoveredQueues}.</li> + * <li>Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.</li> + * <li>Need synchronized on {@link #oldsources} to avoid adding recovered source for the + * to-be-removed peer.</li> * </ul> - * - * When a region server dies, this class uses a watcher to get notified and it - * tries to grab a lock in order to transfer all the queues in a local - * old source. - * - * This class implements the ReplicationListener interface so that it can track changes in - * replication state. */ @InterfaceAudience.Private public class ReplicationSourceManager implements ReplicationListener { - private static final Logger LOG = - LoggerFactory.getLogger(ReplicationSourceManager.class); - // List of all the sources that read this RS's logs - private final List<ReplicationSourceInterface> sources; + private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceManager.class); + // all the sources that read this RS's logs and every peer only has one replication source + private final ConcurrentMap<String, ReplicationSourceInterface> sources; // List of all the sources we got from died RSs private final List<ReplicationSourceInterface> oldsources; private final ReplicationQueueStorage queueStorage; @@ -96,11 +120,16 @@ public class ReplicationSourceManager implements ReplicationListener { private final UUID clusterId; // All about stopping private final Server server; + // All logs we are currently tracking - // Index structure of the map is: peer_id->logPrefix/logGroup->logs - private final Map<String, Map<String, SortedSet<String>>> walsById; + // Index structure of the map is: queue_id->logPrefix/logGroup->logs + // For normal replication source, the peer id is same with the queue id + private final ConcurrentMap<String, Map<String, SortedSet<String>>> walsById; // Logs for recovered sources we are currently tracking - private final Map<String, Map<String, SortedSet<String>>> walsByIdRecoveredQueues; + // the map is: queue_id->logPrefix/logGroup->logs + // For recovered source, the queue id's format is peer_id-servername-* + private final ConcurrentMap<String, Map<String, SortedSet<String>>> walsByIdRecoveredQueues; + private final Configuration conf; private final FileSystem fs; // The paths to the latest log of each wal group, for new coming peers @@ -136,22 +165,22 @@ public class ReplicationSourceManager implements ReplicationListener { ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf, Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId, WALFileLengthProvider walFileLengthProvider) throws IOException { - //CopyOnWriteArrayList is thread-safe. - //Generally, reading is more than modifying. - this.sources = new CopyOnWriteArrayList<>(); + // CopyOnWriteArrayList is thread-safe. + // Generally, reading is more than modifying. + this.sources = new ConcurrentHashMap<>(); this.queueStorage = queueStorage; this.replicationPeers = replicationPeers; this.replicationTracker = replicationTracker; this.server = server; - this.walsById = new HashMap<>(); + this.walsById = new ConcurrentHashMap<>(); this.walsByIdRecoveredQueues = new ConcurrentHashMap<>(); - this.oldsources = new CopyOnWriteArrayList<>(); + this.oldsources = new ArrayList<>(); this.conf = conf; this.fs = fs; this.logDir = logDir; this.oldLogDir = oldLogDir; - this.sleepBeforeFailover = - conf.getLong("replication.sleep.before.failover", 30000); // 30 seconds + this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000); // 30 + // seconds this.clusterId = clusterId; this.walFileLengthProvider = walFileLengthProvider; this.replicationTracker.registerListener(this); @@ -160,8 +189,8 @@ public class ReplicationSourceManager implements ReplicationListener { int nbWorkers = conf.getInt("replication.executor.workers", 1); // use a short 100ms sleep since this could be done inline with a RS startup // even if we fail, other region servers can take care of it - this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, - 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); + this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>()); ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); tfb.setNameFormat("ReplicationExecutor-%d"); tfb.setDaemon(true); @@ -171,74 +200,22 @@ public class ReplicationSourceManager implements ReplicationListener { HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); } - @FunctionalInterface - private interface ReplicationQueueOperation { - void exec() throws ReplicationException; - } - - private void abortWhenFail(ReplicationQueueOperation op) { - try { - op.exec(); - } catch (ReplicationException e) { - server.abort("Failed to operate on replication queue", e); - } - } - /** - * Provide the id of the peer and a log key and this method will figure which - * wal it belongs to and will log, for this region server, the current - * position. It will also clean old logs from the queue. - * @param log Path to the log currently being replicated from - * replication status in zookeeper. It will also delete older entries. - * @param id id of the peer cluster - * @param position current location in the log - * @param queueRecovered indicates if this queue comes from another region server - * @param holdLogInZK if true then the log is retained in ZK - */ - public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered, - boolean holdLogInZK) { - String fileName = log.getName(); - abortWhenFail( - () -> this.queueStorage.setWALPosition(server.getServerName(), id, fileName, position)); - if (holdLogInZK) { - return; - } - cleanOldLogs(fileName, id, queueRecovered); - } - - /** - * Cleans a log file and all older files from ZK. Called when we are sure that a - * log file is closed and has no more entries. - * @param key Path to the log - * @param id id of the peer cluster - * @param queueRecovered Whether this is a recovered queue + * Adds a normal source per registered peer cluster and tries to process all old region server wal + * queues + * <p> + * The returned future is for adoptAbandonedQueues task. */ - public void cleanOldLogs(String key, String id, boolean queueRecovered) { - String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(key); - if (queueRecovered) { - SortedSet<String> wals = walsByIdRecoveredQueues.get(id).get(logPrefix); - if (wals != null && !wals.first().equals(key)) { - cleanOldLogs(wals, key, id); - } - } else { - synchronized (this.walsById) { - SortedSet<String> wals = walsById.get(id).get(logPrefix); - if (wals != null && !wals.first().equals(key)) { - cleanOldLogs(wals, key, id); - } + Future<?> init() throws IOException { + for (String id : this.replicationPeers.getAllPeerIds()) { + addSource(id); + if (replicationForBulkLoadDataEnabled) { + // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case + // when a peer was added before replication for bulk loaded data was enabled. + throwIOExceptionWhenFail(() -> this.queueStorage.addPeerToHFileRefs(id)); } } - } - - private void cleanOldLogs(SortedSet<String> wals, String key, String id) { - SortedSet<String> walSet = wals.headSet(key); - if (LOG.isDebugEnabled()) { - LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet); - } - for (String wal : walSet) { - abortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal)); - } - walSet.clear(); + return this.executor.submit(this::adoptAbandonedQueues); } private void adoptAbandonedQueues() { @@ -254,8 +231,8 @@ public class ReplicationSourceManager implements ReplicationListener { } List<ServerName> otherRegionServers = replicationTracker.getListOfRegionServers().stream() .map(ServerName::valueOf).collect(Collectors.toList()); - LOG.info("Current list of replicators: " + currentReplicators + " other RSs: " - + otherRegionServers); + LOG.info( + "Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers); // Look if there's anything to process after a restart for (ServerName rs : currentReplicators) { @@ -266,56 +243,112 @@ public class ReplicationSourceManager implements ReplicationListener { } /** - * Adds a normal source per registered peer cluster and tries to process all old region server wal - * queues - * <p> - * The returned future is for adoptAbandonedQueues task. + * 1. Add peer to replicationPeers 2. Add the normal source and related replication queue 3. Add + * HFile Refs + * @param peerId the id of replication peer */ - Future<?> init() throws IOException, ReplicationException { - for (String id : this.replicationPeers.getAllPeerIds()) { - addSource(id); + public void addPeer(String peerId) throws IOException { + boolean added = false; + try { + added = this.replicationPeers.addPeer(peerId); + } catch (ReplicationException e) { + throw new IOException(e); + } + if (added) { + addSource(peerId); if (replicationForBulkLoadDataEnabled) { - // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case - // when a peer was added before replication for bulk loaded data was enabled. - this.queueStorage.addPeerToHFileRefs(id); + throwIOExceptionWhenFail(() -> this.queueStorage.addPeerToHFileRefs(peerId)); } } - return this.executor.submit(this::adoptAbandonedQueues); } /** - * Add sources for the given peer cluster on this region server. For the newly added peer, we only - * need to enqueue the latest log of each wal group and do replication - * @param id the id of the peer cluster + * 1. Remove peer for replicationPeers 2. Remove all the recovered sources for the specified id + * and related replication queues 3. Remove the normal source and related replication queue 4. + * Remove HFile Refs + * @param peerId the id of the replication peer + */ + public void removePeer(String peerId) { + replicationPeers.removePeer(peerId); + String terminateMessage = "Replication stream was removed by a user"; + List<ReplicationSourceInterface> oldSourcesToDelete = new ArrayList<>(); + // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer + // see NodeFailoverWorker.run + synchronized (this.oldsources) { + // First close all the recovered sources for this peer + for (ReplicationSourceInterface src : oldsources) { + if (peerId.equals(src.getPeerId())) { + oldSourcesToDelete.add(src); + } + } + for (ReplicationSourceInterface src : oldSourcesToDelete) { + src.terminate(terminateMessage); + removeRecoveredSource(src); + } + } + LOG.info( + "Number of deleted recovered sources for " + peerId + ": " + oldSourcesToDelete.size()); + // Now close the normal source for this peer + ReplicationSourceInterface srcToRemove = this.sources.get(peerId); + if (srcToRemove != null) { + srcToRemove.terminate(terminateMessage); + removeSource(srcToRemove); + } else { + // This only happened in unit test TestReplicationSourceManager#testPeerRemovalCleanup + // Delete queue from storage and memory and queue id is same with peer id for normal + // source + deleteQueue(peerId); + this.walsById.remove(peerId); + } + + // Remove HFile Refs + abortWhenFail(() -> this.queueStorage.removePeerFromHFileRefs(peerId)); + } + + /** + * Factory method to create a replication source + * @param queueId the id of the replication queue + * @return the created source + */ + private ReplicationSourceInterface createSource(String queueId, ReplicationPeer replicationPeer) + throws IOException { + ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId); + + MetricsSource metrics = new MetricsSource(queueId); + // init replication source + src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId, + walFileLengthProvider, metrics); + return src; + } + + /** + * Add a normal source for the given peer on this region server. Meanwhile, add new replication + * queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal + * group and do replication + * @param peerId the id of the replication peer * @return the source that was created */ @VisibleForTesting - ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException { - ReplicationPeer peer = replicationPeers.getPeer(id); - ReplicationSourceInterface src = getReplicationSource(id, peer); - synchronized (this.walsById) { - this.sources.add(src); + ReplicationSourceInterface addSource(String peerId) throws IOException { + ReplicationPeer peer = replicationPeers.getPeer(peerId); + ReplicationSourceInterface src = createSource(peerId, peer); + // synchronized on latestPaths to avoid missing the new log + synchronized (this.latestPaths) { + this.sources.put(peerId, src); Map<String, SortedSet<String>> walsByGroup = new HashMap<>(); - this.walsById.put(id, walsByGroup); + this.walsById.put(peerId, walsByGroup); // Add the latest wal to that source's queue - synchronized (latestPaths) { - if (this.latestPaths.size() > 0) { - for (Path logPath : latestPaths) { - String name = logPath.getName(); - String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name); - SortedSet<String> logs = new TreeSet<>(); - logs.add(name); - walsByGroup.put(walPrefix, logs); - try { - this.queueStorage.addWAL(server.getServerName(), id, name); - } catch (ReplicationException e) { - String message = "Cannot add log to queue when creating a new source, queueId=" + id + - ", filename=" + name; - server.stop(message); - throw e; - } - src.enqueueLog(logPath); - } + if (this.latestPaths.size() > 0) { + for (Path logPath : latestPaths) { + String name = logPath.getName(); + String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name); + SortedSet<String> logs = new TreeSet<>(); + logs.add(name); + walsByGroup.put(walPrefix, logs); + // Abort RS and throw exception to make add peer failed + abortAndThrowIOExceptionWhenFail( + () -> this.queueStorage.addWAL(server.getServerName(), peerId, name)); + src.enqueueLog(logPath); } } } @@ -323,89 +356,219 @@ public class ReplicationSourceManager implements ReplicationListener { return src; } - @VisibleForTesting - int getSizeOfLatestPath() { - synchronized (latestPaths) { - return latestPaths.size(); - } - } - /** - * Delete a complete queue of wals associated with a peer cluster - * @param peerId Id of the peer cluster queue of wals to delete + * Close the previous replication sources of this peer id and open new sources to trigger the new + * replication state changes or new replication config changes. Here we don't need to change + * replication queue storage and only to enqueue all logs to the new replication source + * @param peerId the id of the replication peer + * @throws IOException */ - public void deleteSource(String peerId, boolean closeConnection) { - abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), peerId)); - if (closeConnection) { - this.replicationPeers.removePeer(peerId); + public void refreshSources(String peerId) throws IOException { + String terminateMessage = "Peer " + peerId + + " state or config changed. Will close the previous replication source and open a new one"; + ReplicationPeer peer = replicationPeers.getPeer(peerId); + ReplicationSourceInterface src = createSource(peerId, peer); + // synchronized on latestPaths to avoid missing the new log + synchronized (this.latestPaths) { + ReplicationSourceInterface toRemove = this.sources.put(peerId, src); + if (toRemove != null) { + LOG.info("Terminate replication source for " + toRemove.getPeerId()); + toRemove.terminate(terminateMessage); + } + for (SortedSet<String> walsByGroup : walsById.get(peerId).values()) { + walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal))); + } } - } + LOG.info("Startup replication source for " + src.getPeerId()); + src.startup(); - /** - * Terminate the replication on this region server - */ - public void join() { - this.executor.shutdown(); - for (ReplicationSourceInterface source : this.sources) { - source.terminate("Region server is closing"); + List<ReplicationSourceInterface> toStartup = new ArrayList<>(); + // synchronized on oldsources to avoid race with NodeFailoverWorker + synchronized (this.oldsources) { + List<String> previousQueueIds = new ArrayList<>(); + for (ReplicationSourceInterface oldSource : this.oldsources) { + if (oldSource.getPeerId().equals(peerId)) { + previousQueueIds.add(oldSource.getQueueId()); + oldSource.terminate(terminateMessage); + this.oldsources.remove(oldSource); + } + } + for (String queueId : previousQueueIds) { + ReplicationSourceInterface replicationSource = createSource(queueId, peer); + this.oldsources.add(replicationSource); + for (SortedSet<String> walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) { + walsByGroup.forEach(wal -> src.enqueueLog(new Path(wal))); + } + toStartup.add(replicationSource); + } + } + for (ReplicationSourceInterface replicationSource : oldsources) { + replicationSource.startup(); } } /** - * Get a copy of the wals of the first source on this rs - * @return a sorted set of wal names + * Clear the metrics and related replication queue of the specified old source + * @param src source to clear */ - @VisibleForTesting - Map<String, Map<String, SortedSet<String>>> getWALs() { - return Collections.unmodifiableMap(walsById); + void removeRecoveredSource(ReplicationSourceInterface src) { + LOG.info("Done with the recovered queue " + src.getQueueId()); + src.getSourceMetrics().clear(); + this.oldsources.remove(src); + // Delete queue from storage and memory + deleteQueue(src.getQueueId()); + this.walsByIdRecoveredQueues.remove(src.getQueueId()); } /** - * Get a copy of the wals of the recovered sources on this rs - * @return a sorted set of wal names + * Clear the metrics and related replication queue of the specified old source + * @param src source to clear */ - @VisibleForTesting - Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() { - return Collections.unmodifiableMap(walsByIdRecoveredQueues); + void removeSource(ReplicationSourceInterface src) { + LOG.info("Done with the queue " + src.getQueueId()); + src.getSourceMetrics().clear(); + this.sources.remove(src.getPeerId()); + // Delete queue from storage and memory + deleteQueue(src.getQueueId()); + this.walsById.remove(src.getQueueId()); } /** - * Get a list of all the normal sources of this rs - * @return lis of all sources + * Delete a complete queue of wals associated with a replication source + * @param queueId the id of replication queue to delete */ - public List<ReplicationSourceInterface> getSources() { - return this.sources; + private void deleteQueue(String queueId) { + abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), queueId)); + } + + @FunctionalInterface + private interface ReplicationQueueOperation { + void exec() throws ReplicationException; + } + + private void abortWhenFail(ReplicationQueueOperation op) { + try { + op.exec(); + } catch (ReplicationException e) { + server.abort("Failed to operate on replication queue", e); + } + } + + private void throwIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException { + try { + op.exec(); + } catch (ReplicationException e) { + throw new IOException(e); + } + } + + private void abortAndThrowIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException { + try { + op.exec(); + } catch (ReplicationException e) { + server.abort("Failed to operate on replication queue", e); + throw new IOException(e); + } } /** - * Get a list of all the old sources of this rs - * @return list of all old sources + * This method will log the current position to storage. And also clean old logs from the + * replication queue. + * @param log Path to the log currently being replicated + * @param queueId id of the replication queue + * @param position current location in the log + * @param queueRecovered indicates if this queue comes from another region server */ - public List<ReplicationSourceInterface> getOldSources() { - return this.oldsources; + public void logPositionAndCleanOldLogs(Path log, String queueId, long position, + boolean queueRecovered) { + String fileName = log.getName(); + abortWhenFail( + () -> this.queueStorage.setWALPosition(server.getServerName(), queueId, fileName, position)); + cleanOldLogs(fileName, queueId, queueRecovered); } /** - * Get the normal source for a given peer - * @param peerId - * @return the normal source for the give peer if it exists, otherwise null. + * Cleans a log file and all older logs from replication queue. Called when we are sure that a log + * file is closed and has no more entries. + * @param log Path to the log + * @param queueId id of the replication queue + * @param queueRecovered Whether this is a recovered queue */ - public ReplicationSourceInterface getSource(String peerId) { - return getSources().stream().filter(s -> s.getPeerId().equals(peerId)).findFirst().orElse(null); + @VisibleForTesting + void cleanOldLogs(String log, String queueId, boolean queueRecovered) { + String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log); + if (queueRecovered) { + SortedSet<String> wals = walsByIdRecoveredQueues.get(queueId).get(logPrefix); + if (wals != null && !wals.first().equals(log)) { + cleanOldLogs(wals, log, queueId); + } + } else { + // synchronized on walsById to avoid race with preLogRoll + synchronized (this.walsById) { + SortedSet<String> wals = walsById.get(queueId).get(logPrefix); + if (wals != null && !wals.first().equals(log)) { + cleanOldLogs(wals, log, queueId); + } + } + } } - @VisibleForTesting - List<String> getAllQueues() throws ReplicationException { - return queueStorage.getAllQueues(server.getServerName()); + private void cleanOldLogs(SortedSet<String> wals, String key, String id) { + SortedSet<String> walSet = wals.headSet(key); + if (LOG.isDebugEnabled()) { + LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet); + } + for (String wal : walSet) { + abortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal)); + } + walSet.clear(); } // public because of we call it in TestReplicationEmptyWALRecovery @VisibleForTesting public void preLogRoll(Path newLog) throws IOException { - recordLog(newLog); String logName = newLog.getName(); String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName); - synchronized (latestPaths) { + // synchronized on latestPaths to avoid the new open source miss the new log + synchronized (this.latestPaths) { + // Add log to queue storage + for (ReplicationSourceInterface source : this.sources.values()) { + // If record log to queue storage failed, abort RS and throw exception to make log roll + // failed + abortAndThrowIOExceptionWhenFail( + () -> this.queueStorage.addWAL(server.getServerName(), source.getQueueId(), logName)); + } + + // synchronized on walsById to avoid race with cleanOldLogs + synchronized (this.walsById) { + // Update walsById map + for (Map.Entry<String, Map<String, SortedSet<String>>> entry : this.walsById.entrySet()) { + String peerId = entry.getKey(); + Map<String, SortedSet<String>> walsByPrefix = entry.getValue(); + boolean existingPrefix = false; + for (Map.Entry<String, SortedSet<String>> walsEntry : walsByPrefix.entrySet()) { + SortedSet<String> wals = walsEntry.getValue(); + if (this.sources.isEmpty()) { + // If there's no slaves, don't need to keep the old wals since + // we only consider the last one when a new slave comes in + wals.clear(); + } + if (logPrefix.equals(walsEntry.getKey())) { + wals.add(logName); + existingPrefix = true; + } + } + if (!existingPrefix) { + // The new log belongs to a new group, add it into this peer + LOG.debug("Start tracking logs for wal group " + logPrefix + " for peer " + peerId); + SortedSet<String> wals = new TreeSet<>(); + wals.add(logName); + walsByPrefix.put(logPrefix, wals); + } + } + } + + // Add to latestPaths Iterator<Path> iterator = latestPaths.iterator(); while (iterator.hasNext()) { Path path = iterator.next(); @@ -418,89 +581,23 @@ public class ReplicationSourceManager implements ReplicationListener { } } - /** - * Check and enqueue the given log to the correct source. If there's still no source for the - * group to which the given log belongs, create one - * @param logPath the log path to check and enqueue - * @throws IOException - */ - private void recordLog(Path logPath) throws IOException { - String logName = logPath.getName(); - String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName); - // update replication queues on ZK - // synchronize on replicationPeers to avoid adding source for the to-be-removed peer - synchronized (replicationPeers) { - for (String id : replicationPeers.getAllPeerIds()) { - try { - this.queueStorage.addWAL(server.getServerName(), id, logName); - } catch (ReplicationException e) { - throw new IOException("Cannot add log to replication queue" - + " when creating a new source, queueId=" + id + ", filename=" + logName, e); - } - } - } - // update walsById map - synchronized (walsById) { - for (Map.Entry<String, Map<String, SortedSet<String>>> entry : this.walsById.entrySet()) { - String peerId = entry.getKey(); - Map<String, SortedSet<String>> walsByPrefix = entry.getValue(); - boolean existingPrefix = false; - for (Map.Entry<String, SortedSet<String>> walsEntry : walsByPrefix.entrySet()) { - SortedSet<String> wals = walsEntry.getValue(); - if (this.sources.isEmpty()) { - // If there's no slaves, don't need to keep the old wals since - // we only consider the last one when a new slave comes in - wals.clear(); - } - if (logPrefix.equals(walsEntry.getKey())) { - wals.add(logName); - existingPrefix = true; - } - } - if (!existingPrefix) { - // The new log belongs to a new group, add it into this peer - LOG.debug("Start tracking logs for wal group " + logPrefix + " for peer " + peerId); - SortedSet<String> wals = new TreeSet<>(); - wals.add(logName); - walsByPrefix.put(logPrefix, wals); - } - } - } - } - // public because of we call it in TestReplicationEmptyWALRecovery @VisibleForTesting public void postLogRoll(Path newLog) throws IOException { // This only updates the sources we own, not the recovered ones - for (ReplicationSourceInterface source : this.sources) { + for (ReplicationSourceInterface source : this.sources.values()) { source.enqueueLog(newLog); } } - @VisibleForTesting - public AtomicLong getTotalBufferUsed() { - return totalBufferUsed; - } - - /** - * Factory method to create a replication source - * @param peerId the id of the peer cluster - * @return the created source - */ - private ReplicationSourceInterface getReplicationSource(String peerId, - ReplicationPeer replicationPeer) throws IOException { - ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, peerId); - - MetricsSource metrics = new MetricsSource(peerId); - // init replication source - src.init(conf, fs, this, queueStorage, replicationPeer, server, peerId, clusterId, - walFileLengthProvider, metrics); - return src; + @Override + public void regionServerRemoved(String regionserver) { + transferQueues(ServerName.valueOf(regionserver)); } /** * Transfer all the queues of the specified to this region server. First it tries to grab a lock - * and if it works it will move the znodes and finally will delete the old znodes. + * and if it works it will move the old queues and finally will delete the old queues. * <p> * It creates one old source for any type of source of the old rs. */ @@ -518,102 +615,8 @@ public class ReplicationSourceManager implements ReplicationListener { } /** - * Clear the references to the specified old source - * @param src source to clear - */ - public void closeRecoveredQueue(ReplicationSourceInterface src) { - LOG.info("Done with the recovered queue " + src.getPeerClusterZnode()); - if (src instanceof ReplicationSource) { - ((ReplicationSource) src).getSourceMetrics().clear(); - } - this.oldsources.remove(src); - deleteSource(src.getPeerClusterZnode(), false); - this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode()); - } - - /** - * Clear the references to the specified old source - * @param src source to clear - */ - public void closeQueue(ReplicationSourceInterface src) { - LOG.info("Done with the queue " + src.getPeerClusterZnode()); - src.getSourceMetrics().clear(); - this.sources.remove(src); - deleteSource(src.getPeerClusterZnode(), true); - this.walsById.remove(src.getPeerClusterZnode()); - } - - public void addPeer(String id) throws ReplicationException, IOException { - LOG.info("Trying to add peer, peerId: " + id); - boolean added = this.replicationPeers.addPeer(id); - if (added) { - LOG.info("Peer " + id + " connected success, trying to start the replication source thread."); - addSource(id); - if (replicationForBulkLoadDataEnabled) { - this.queueStorage.addPeerToHFileRefs(id); - } - } - } - - /** - * Thie method first deletes all the recovered sources for the specified - * id, then deletes the normal source (deleting all related data in ZK). - * @param id The id of the peer cluster - */ - public void removePeer(String id) { - LOG.info("Closing the following queue " + id + ", currently have " - + sources.size() + " and another " - + oldsources.size() + " that were recovered"); - String terminateMessage = "Replication stream was removed by a user"; - List<ReplicationSourceInterface> oldSourcesToDelete = new ArrayList<>(); - // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer - // see NodeFailoverWorker.run - synchronized (oldsources) { - // First close all the recovered sources for this peer - for (ReplicationSourceInterface src : oldsources) { - if (id.equals(src.getPeerId())) { - oldSourcesToDelete.add(src); - } - } - for (ReplicationSourceInterface src : oldSourcesToDelete) { - src.terminate(terminateMessage); - closeRecoveredQueue(src); - } - } - LOG.info("Number of deleted recovered sources for " + id + ": " - + oldSourcesToDelete.size()); - // Now look for the one on this cluster - List<ReplicationSourceInterface> srcToRemove = new ArrayList<>(); - // synchronize on replicationPeers to avoid adding source for the to-be-removed peer - synchronized (this.replicationPeers) { - for (ReplicationSourceInterface src : this.sources) { - if (id.equals(src.getPeerId())) { - srcToRemove.add(src); - } - } - if (srcToRemove.isEmpty()) { - LOG.error("The peer we wanted to remove is missing a ReplicationSourceInterface. " + - "This could mean that ReplicationSourceInterface initialization failed for this peer " + - "and that replication on this peer may not be caught up. peerId=" + id); - } - for (ReplicationSourceInterface toRemove : srcToRemove) { - toRemove.terminate(terminateMessage); - closeQueue(toRemove); - } - deleteSource(id, true); - } - // Remove HFile Refs znode from zookeeper - abortWhenFail(() -> this.queueStorage.removePeerFromHFileRefs(id)); - } - - @Override - public void regionServerRemoved(String regionserver) { - transferQueues(ServerName.valueOf(regionserver)); - } - - /** - * Class responsible to setup new ReplicationSources to take care of the - * queues from dead region servers. + * Class responsible to setup new ReplicationSources to take care of the queues from dead region + * servers. */ class NodeFailoverWorker extends Thread { @@ -643,10 +646,10 @@ public class ReplicationSourceManager implements ReplicationListener { } Map<String, Set<String>> newQueues = new HashMap<>(); try { - List<String> peers = queueStorage.getAllQueues(deadRS); - while (!peers.isEmpty()) { + List<String> queues = queueStorage.getAllQueues(deadRS); + while (!queues.isEmpty()) { Pair<String, SortedSet<String>> peer = queueStorage.claimQueue(deadRS, - peers.get(ThreadLocalRandom.current().nextInt(peers.size())), server.getServerName()); + queues.get(ThreadLocalRandom.current().nextInt(queues.size())), server.getServerName()); long sleep = sleepBeforeFailover / 2; if (!peer.getSecond().isEmpty()) { newQueues.put(peer.getFirst(), peer.getSecond()); @@ -658,9 +661,9 @@ public class ReplicationSourceManager implements ReplicationListener { LOG.warn("Interrupted while waiting before transferring a queue."); Thread.currentThread().interrupt(); } - peers = queueStorage.getAllQueues(deadRS); + queues = queueStorage.getAllQueues(deadRS); } - if (!peers.isEmpty()) { + if (queues.isEmpty()) { queueStorage.removeReplicatorIfQueueIsEmpty(deadRS); } } catch (ReplicationException e) { @@ -675,18 +678,18 @@ public class ReplicationSourceManager implements ReplicationListener { } for (Map.Entry<String, Set<String>> entry : newQueues.entrySet()) { - String peerId = entry.getKey(); + String queueId = entry.getKey(); Set<String> walsSet = entry.getValue(); try { // there is not an actual peer defined corresponding to peerId for the failover. - ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); + ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId); String actualPeerId = replicationQueueInfo.getPeerId(); ReplicationPeer peer = replicationPeers.getPeer(actualPeerId); if (peer == null) { - LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS - + ", peer is null"); - abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId)); + LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS + + ", peer is null"); + abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), queueId)); continue; } if (server instanceof ReplicationSyncUp.DummyServer @@ -698,7 +701,7 @@ public class ReplicationSourceManager implements ReplicationListener { } // track sources in walsByIdRecoveredQueues Map<String, SortedSet<String>> walsByGroup = new HashMap<>(); - walsByIdRecoveredQueues.put(peerId, walsByGroup); + walsByIdRecoveredQueues.put(queueId, walsByGroup); for (String wal : walsSet) { String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal); SortedSet<String> wals = walsByGroup.get(walPrefix); @@ -709,14 +712,12 @@ public class ReplicationSourceManager implements ReplicationListener { wals.add(wal); } - // enqueue sources - ReplicationSourceInterface src = getReplicationSource(peerId, peer); + ReplicationSourceInterface src = createSource(queueId, peer); // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer - // see removePeer synchronized (oldsources) { if (!replicationPeers.getAllPeerIds().contains(src.getPeerId())) { src.terminate("Recovered queue doesn't belong to any current peer"); - closeRecoveredQueue(src); + removeRecoveredSource(src); continue; } oldsources.add(src); @@ -734,6 +735,82 @@ public class ReplicationSourceManager implements ReplicationListener { } /** + * Terminate the replication on this region server + */ + public void join() { + this.executor.shutdown(); + for (ReplicationSourceInterface source : this.sources.values()) { + source.terminate("Region server is closing"); + } + } + + /** + * Get a copy of the wals of the normal sources on this rs + * @return a sorted set of wal names + */ + @VisibleForTesting + Map<String, Map<String, SortedSet<String>>> getWALs() { + return Collections.unmodifiableMap(walsById); + } + + /** + * Get a copy of the wals of the recovered sources on this rs + * @return a sorted set of wal names + */ + @VisibleForTesting + Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() { + return Collections.unmodifiableMap(walsByIdRecoveredQueues); + } + + /** + * Get a list of all the normal sources of this rs + * @return list of all normal sources + */ + public List<ReplicationSourceInterface> getSources() { + return new ArrayList<>(this.sources.values()); + } + + /** + * Get a list of all the recovered sources of this rs + * @return list of all recovered sources + */ + public List<ReplicationSourceInterface> getOldSources() { + return this.oldsources; + } + + /** + * Get the normal source for a given peer + * @return the normal source for the give peer if it exists, otherwise null. + */ + @VisibleForTesting + public ReplicationSourceInterface getSource(String peerId) { + return this.sources.get(peerId); + } + + @VisibleForTesting + List<String> getAllQueues() throws IOException { + List<String> allQueues = Collections.emptyList(); + try { + allQueues = queueStorage.getAllQueues(server.getServerName()); + } catch (ReplicationException e) { + throw new IOException(e); + } + return allQueues; + } + + @VisibleForTesting + int getSizeOfLatestPath() { + synchronized (latestPaths) { + return latestPaths.size(); + } + } + + @VisibleForTesting + public AtomicLong getTotalBufferUsed() { + return totalBufferUsed; + } + + /** * Get the directory where wals are archived * @return the directory where wals are archived */ @@ -761,28 +838,30 @@ public class ReplicationSourceManager implements ReplicationListener { * Get the ReplicationPeers used by this ReplicationSourceManager * @return the ReplicationPeers used by this ReplicationSourceManager */ - public ReplicationPeers getReplicationPeers() {return this.replicationPeers;} + public ReplicationPeers getReplicationPeers() { + return this.replicationPeers; + } /** * Get a string representation of all the sources' metrics */ public String getStats() { StringBuilder stats = new StringBuilder(); - for (ReplicationSourceInterface source : sources) { + for (ReplicationSourceInterface source : this.sources.values()) { stats.append("Normal source for cluster " + source.getPeerId() + ": "); stats.append(source.getStats() + "\n"); } for (ReplicationSourceInterface oldSource : oldsources) { - stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerId()+": "); - stats.append(oldSource.getStats()+ "\n"); + stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerId() + ": "); + stats.append(oldSource.getStats() + "\n"); } return stats.toString(); } public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) - throws ReplicationException { - for (ReplicationSourceInterface source : this.sources) { - source.addHFileRefs(tableName, family, pairs); + throws IOException { + for (ReplicationSourceInterface source : this.sources.values()) { + throwIOExceptionWhenFail(() -> source.addHFileRefs(tableName, family, pairs)); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/d36aacdf/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index ced2980..959f676 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -223,15 +223,15 @@ public class ReplicationSourceShipper extends Thread { } protected void updateLogPosition(long lastReadPosition) { - source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getPeerClusterZnode(), - lastReadPosition, false, false); + source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getQueueId(), + lastReadPosition, false); lastLoggedPosition = lastReadPosition; } public void startup(UncaughtExceptionHandler handler) { String name = Thread.currentThread().getName(); Threads.setDaemonThreadRunning(this, name + ".replicationSource." + walGroupId + "," - + source.getPeerClusterZnode(), handler); + + source.getQueueId(), handler); } public PriorityBlockingQueue<Path> getLogQueue() { http://git-wip-us.apache.org/repos/asf/hbase/blob/d36aacdf/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index b6b50ad..579d20f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -111,7 +111,7 @@ public class ReplicationSourceWALReader extends Thread { this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false); this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount); - LOG.info("peerClusterZnode=" + source.getPeerClusterZnode() + LOG.info("peerClusterZnode=" + source.getQueueId() + ", ReplicationSourceWALReaderThread : " + source.getPeerId() + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity + ", replicationBatchCountCapacity=" + replicationBatchCountCapacity http://git-wip-us.apache.org/repos/asf/hbase/blob/d36aacdf/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index 38ec598..ff20ddc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -89,7 +89,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { } @Override - public String getPeerClusterZnode() { + public String getQueueId() { return peerClusterId; }