This is an automated email from the ASF dual-hosted git repository. sunxin pushed a commit to branch HBASE-24666 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 8481d547fd14493570e2b32f6b95f0a5c0e15536 Author: Guanghao Zhang <zg...@apache.org> AuthorDate: Mon Jul 13 17:35:32 2020 +0800 HBASE-24681 Remove the cache walsById/walsByIdRecoveredQueues from ReplicationSourceManager (#2019) Signed-off-by: Wellington Chevreuil <wchevre...@apache.org> --- .../regionserver/ReplicationSourceManager.java | 204 +++++++-------------- 1 file changed, 62 insertions(+), 142 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index ad7c033..db12c00 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -93,30 +93,6 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto * <li>No need synchronized on {@link #sources}. {@link #sources} is a ConcurrentHashMap and there * is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So there is no race for peer * operations.</li> - * <li>Need synchronized on {@link #walsById}. There are four methods which modify it, - * {@link #addPeer(String)}, {@link #removePeer(String)}, - * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and {@link #preLogRoll(Path)}. - * {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer id in - * {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and - * {@link #removePeer(String)}. {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} - * is called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}. - * {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then - * remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only - * case need synchronized is {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and - * {@link #preLogRoll(Path)}.</li> - * <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which - * modify it, {@link #removePeer(String)} , - * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and - * {@link ReplicationSourceManager#claimQueue(ServerName, String)}. - * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} is called by - * {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the - * {@link ReplicationSourceInterface} firstly, then remove the wals from - * {@link #walsByIdRecoveredQueues}. And - * {@link ReplicationSourceManager#claimQueue(ServerName, String)} will add the wals to - * {@link #walsByIdRecoveredQueues} firstly, then start up a {@link ReplicationSourceInterface}. So - * there is no race here. For {@link ReplicationSourceManager#claimQueue(ServerName, String)} and - * {@link #removePeer(String)}, there is already synchronized on {@link #oldsources}. So no need - * synchronized on {@link #walsByIdRecoveredQueues}.</li> * <li>Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.</li> * <li>Need synchronized on {@link #oldsources} to avoid adding recovered source for the * to-be-removed peer.</li> @@ -144,15 +120,6 @@ public class ReplicationSourceManager { // All about stopping private final Server server; - // All logs we are currently tracking - // Index structure of the map is: queue_id->logPrefix/logGroup->logs - // For normal replication source, the peer id is same with the queue id - private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsById; - // Logs for recovered sources we are currently tracking - // the map is: queue_id->logPrefix/logGroup->logs - // For recovered source, the queue id's format is peer_id-servername-* - private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsByIdRecoveredQueues; - private final SyncReplicationPeerMappingManager syncReplicationPeerMappingManager; private final Configuration conf; @@ -212,8 +179,6 @@ public class ReplicationSourceManager { this.queueStorage = queueStorage; this.replicationPeers = replicationPeers; this.server = server; - this.walsById = new ConcurrentHashMap<>(); - this.walsByIdRecoveredQueues = new ConcurrentHashMap<>(); this.oldsources = new ArrayList<>(); this.conf = conf; this.fs = fs; @@ -322,7 +287,6 @@ public class ReplicationSourceManager { // Delete queue from storage and memory and queue id is same with peer id for normal // source deleteQueue(peerId); - this.walsById.remove(peerId); } ReplicationPeerConfig peerConfig = peer.getPeerConfig(); if (peerConfig.isSyncReplication()) { @@ -364,15 +328,10 @@ public class ReplicationSourceManager { // synchronized on latestPaths to avoid missing the new log synchronized (this.latestPaths) { this.sources.put(peerId, src); - Map<String, NavigableSet<String>> walsByGroup = new HashMap<>(); - this.walsById.put(peerId, walsByGroup); // Add the latest wal to that source's queue if (!latestPaths.isEmpty()) { for (Map.Entry<String, Path> walPrefixAndPath : latestPaths.entrySet()) { Path walPath = walPrefixAndPath.getValue(); - NavigableSet<String> wals = new TreeSet<>(); - wals.add(walPath.getName()); - walsByGroup.put(walPrefixAndPath.getKey(), wals); // Abort RS and throw exception to make add peer failed abortAndThrowIOExceptionWhenFail( () -> this.queueStorage.addWAL(server.getServerName(), peerId, walPath.getName())); @@ -426,7 +385,10 @@ public class ReplicationSourceManager { // map from walsById since later we may fail to delete them from the replication queue // storage, and when we retry next time, we can not know the wal files that need to be deleted // from the replication queue storage. - walsById.get(peerId).forEach((k, v) -> wals.put(k, new TreeSet<>(v))); + this.queueStorage.getWALsInQueue(this.server.getServerName(), peerId).forEach(wal -> { + String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal); + wals.computeIfAbsent(walPrefix, p -> new TreeSet<>()).add(wal); + }); } LOG.info("Startup replication source for " + src.getPeerId()); src.startup(); @@ -435,15 +397,6 @@ public class ReplicationSourceManager { queueStorage.removeWAL(server.getServerName(), peerId, wal); } } - synchronized (walsById) { - Map<String, NavigableSet<String>> oldWals = walsById.get(peerId); - wals.forEach((k, v) -> { - NavigableSet<String> walsByGroup = oldWals.get(k); - if (walsByGroup != null) { - walsByGroup.removeAll(v); - } - }); - } // synchronized on oldsources to avoid race with NodeFailoverWorker. Since NodeFailoverWorker is // a background task, we will delete the file from replication queue storage under the lock to // simplify the logic. @@ -455,7 +408,6 @@ public class ReplicationSourceManager { oldSource.terminate(terminateMessage); oldSource.getSourceMetrics().clear(); queueStorage.removeQueue(server.getServerName(), queueId); - walsByIdRecoveredQueues.remove(queueId); iter.remove(); } } @@ -468,7 +420,7 @@ public class ReplicationSourceManager { * replication queue storage and only to enqueue all logs to the new replication source * @param peerId the id of the replication peer */ - public void refreshSources(String peerId) throws IOException { + public void refreshSources(String peerId) throws ReplicationException, IOException { String terminateMessage = "Peer " + peerId + " state or config changed. Will close the previous replication source and open a new one"; ReplicationPeer peer = replicationPeers.getPeer(peerId); @@ -481,9 +433,8 @@ public class ReplicationSourceManager { // Do not clear metrics toRemove.terminate(terminateMessage, null, false); } - for (NavigableSet<String> walsByGroup : walsById.get(peerId).values()) { - walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal))); - } + this.queueStorage.getWALsInQueue(this.server.getServerName(), peerId) + .forEach(wal -> src.enqueueLog(new Path(this.logDir, wal))); } LOG.info("Startup replication source for " + src.getPeerId()); src.startup(); @@ -504,9 +455,8 @@ public class ReplicationSourceManager { for (String queueId : previousQueueIds) { ReplicationSourceInterface recoveredReplicationSource = createSource(queueId, peer); this.oldsources.add(recoveredReplicationSource); - for (SortedSet<String> walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) { - walsByGroup.forEach(wal -> recoveredReplicationSource.enqueueLog(new Path(wal))); - } + this.queueStorage.getWALsInQueue(this.server.getServerName(), queueId) + .forEach(wal -> recoveredReplicationSource.enqueueLog(new Path(wal))); toStartup.add(recoveredReplicationSource); } } @@ -526,7 +476,6 @@ public class ReplicationSourceManager { LOG.info("Done with the recovered queue {}", src.getQueueId()); // Delete queue from storage and memory deleteQueue(src.getQueueId()); - this.walsByIdRecoveredQueues.remove(src.getQueueId()); return true; } @@ -549,8 +498,6 @@ public class ReplicationSourceManager { this.sources.remove(src.getPeerId()); // Delete queue from storage and memory deleteQueue(src.getQueueId()); - this.walsById.remove(src.getQueueId()); - } /** @@ -635,42 +582,19 @@ public class ReplicationSourceManager { * @param inclusive whether we should also remove the given log file * @param source the replication source */ - void cleanOldLogs(String log, boolean inclusive, ReplicationSourceInterface source) { - String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log); - if (source.isRecovered()) { - NavigableSet<String> wals = walsByIdRecoveredQueues.get(source.getQueueId()).get(logPrefix); - if (wals != null) { - NavigableSet<String> walsToRemove = wals.headSet(log, inclusive); - if (walsToRemove.isEmpty()) { - return; - } - cleanOldLogs(walsToRemove, source); - walsToRemove.clear(); - } - } else { - NavigableSet<String> wals; - NavigableSet<String> walsToRemove; - // synchronized on walsById to avoid race with preLogRoll - synchronized (this.walsById) { - wals = walsById.get(source.getQueueId()).get(logPrefix); - if (wals == null) { - return; - } - walsToRemove = wals.headSet(log, inclusive); - if (walsToRemove.isEmpty()) { - return; - } - walsToRemove = new TreeSet<>(walsToRemove); - } - // cleanOldLogs may spend some time, especially for sync replication where we may want to - // remove remote wals as the remote cluster may have already been down, so we do it outside - // the lock to avoid block preLogRoll - cleanOldLogs(walsToRemove, source); - // now let's remove the files in the set - synchronized (this.walsById) { - wals.removeAll(walsToRemove); - } + void cleanOldLogs(String log, boolean inclusive, + ReplicationSourceInterface source) { + NavigableSet<String> walsToRemove; + synchronized (this.latestPaths) { + walsToRemove = getWalsToRemove(source.getQueueId(), log, inclusive); + } + if (walsToRemove.isEmpty()) { + return; } + // cleanOldLogs may spend some time, especially for sync replication where we may want to + // remove remote wals as the remote cluster may have already been down, so we do it outside + // the lock to avoid block preLogRoll + cleanOldLogs(walsToRemove, source); } private void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals) @@ -750,37 +674,6 @@ public class ReplicationSourceManager { abortAndThrowIOExceptionWhenFail( () -> this.queueStorage.addWAL(server.getServerName(), source.getQueueId(), logName)); } - - // synchronized on walsById to avoid race with cleanOldLogs - synchronized (this.walsById) { - // Update walsById map - for (Map.Entry<String, Map<String, NavigableSet<String>>> entry : this.walsById - .entrySet()) { - String peerId = entry.getKey(); - Map<String, NavigableSet<String>> walsByPrefix = entry.getValue(); - boolean existingPrefix = false; - for (Map.Entry<String, NavigableSet<String>> walsEntry : walsByPrefix.entrySet()) { - SortedSet<String> wals = walsEntry.getValue(); - if (this.sources.isEmpty()) { - // If there's no slaves, don't need to keep the old wals since - // we only consider the last one when a new slave comes in - wals.clear(); - } - if (logPrefix.equals(walsEntry.getKey())) { - wals.add(logName); - existingPrefix = true; - } - } - if (!existingPrefix) { - // The new log belongs to a new group, add it into this peer - LOG.debug("Start tracking logs for wal group {} for peer {}", logPrefix, peerId); - NavigableSet<String> wals = new TreeSet<>(); - wals.add(logName); - walsByPrefix.put(logPrefix, wals); - } - } - } - // Add to latestPaths latestPaths.put(logPrefix, newLog); } @@ -887,18 +780,6 @@ public class ReplicationSourceManager { return; } } - // track sources in walsByIdRecoveredQueues - Map<String, NavigableSet<String>> walsByGroup = new HashMap<>(); - walsByIdRecoveredQueues.put(queueId, walsByGroup); - for (String wal : walsSet) { - String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal); - NavigableSet<String> wals = walsByGroup.get(walPrefix); - if (wals == null) { - wals = new TreeSet<>(); - walsByGroup.put(walPrefix, wals); - } - wals.add(wal); - } oldsources.add(src); LOG.info("Added source for recovered queue {}", src.getQueueId()); for (String wal : walsSet) { @@ -926,7 +807,18 @@ public class ReplicationSourceManager { * Get a copy of the wals of the normal sources on this rs * @return a sorted set of wal names */ - public Map<String, Map<String, NavigableSet<String>>> getWALs() { + public Map<String, Map<String, NavigableSet<String>>> getWALs() + throws ReplicationException { + Map<String, Map<String, NavigableSet<String>>> walsById = new HashMap<>(); + for (ReplicationSourceInterface source : sources.values()) { + String queueId = source.getQueueId(); + Map<String, NavigableSet<String>> walsByGroup = new HashMap<>(); + walsById.put(queueId, walsByGroup); + for (String wal : this.queueStorage.getWALsInQueue(this.server.getServerName(), queueId)) { + String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal); + walsByGroup.computeIfAbsent(walPrefix, p -> new TreeSet<>()).add(wal); + } + } return Collections.unmodifiableMap(walsById); } @@ -934,7 +826,18 @@ public class ReplicationSourceManager { * Get a copy of the wals of the recovered sources on this rs * @return a sorted set of wal names */ - Map<String, Map<String, NavigableSet<String>>> getWalsByIdRecoveredQueues() { + Map<String, Map<String, NavigableSet<String>>> getWalsByIdRecoveredQueues() + throws ReplicationException { + Map<String, Map<String, NavigableSet<String>>> walsByIdRecoveredQueues = new HashMap<>(); + for (ReplicationSourceInterface source : oldsources) { + String queueId = source.getQueueId(); + Map<String, NavigableSet<String>> walsByGroup = new HashMap<>(); + walsByIdRecoveredQueues.put(queueId, walsByGroup); + for (String wal : this.queueStorage.getWALsInQueue(this.server.getServerName(), queueId)) { + String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal); + walsByGroup.computeIfAbsent(walPrefix, p -> new TreeSet<>()).add(wal); + } + } return Collections.unmodifiableMap(walsByIdRecoveredQueues); } @@ -1165,4 +1068,21 @@ public class ReplicationSourceManager { ReplicationQueueStorage getQueueStorage() { return queueStorage; } + + private NavigableSet<String> getWalsToRemove(String queueId, String log, boolean inclusive) { + NavigableSet<String> walsToRemove = new TreeSet<>(); + String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log); + try { + this.queueStorage.getWALsInQueue(this.server.getServerName(), queueId).forEach(wal -> { + String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal); + if (walPrefix.equals(logPrefix)) { + walsToRemove.add(wal); + } + }); + } catch (ReplicationException e) { + // Just log the exception here, as the recovered replication source will try to cleanup again. + LOG.warn("Failed to read wals in queue {}", queueId, e); + } + return walsToRemove.headSet(log, inclusive); + } }