This is an automated email from the ASF dual-hosted git repository.

sunxin pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 8481d547fd14493570e2b32f6b95f0a5c0e15536
Author: Guanghao Zhang <zg...@apache.org>
AuthorDate: Mon Jul 13 17:35:32 2020 +0800

    HBASE-24681 Remove the cache walsById/walsByIdRecoveredQueues from 
ReplicationSourceManager (#2019)
    
    Signed-off-by: Wellington Chevreuil <wchevre...@apache.org>
---
 .../regionserver/ReplicationSourceManager.java     | 204 +++++++--------------
 1 file changed, 62 insertions(+), 142 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index ad7c033..db12c00 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -93,30 +93,6 @@ import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
  * <li>No need synchronized on {@link #sources}. {@link #sources} is a 
ConcurrentHashMap and there
  * is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So there is no 
race for peer
  * operations.</li>
- * <li>Need synchronized on {@link #walsById}. There are four methods which 
modify it,
- * {@link #addPeer(String)}, {@link #removePeer(String)},
- * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and 
{@link #preLogRoll(Path)}.
- * {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer id in
- * {@link PeerProcedureHandlerImpl}. So there is no race between {@link 
#addPeer(String)} and
- * {@link #removePeer(String)}. {@link #cleanOldLogs(String, boolean, 
ReplicationSourceInterface)}
- * is called by {@link ReplicationSourceInterface}. So no race with {@link 
#addPeer(String)}.
- * {@link #removePeer(String)} will terminate the {@link 
ReplicationSourceInterface} firstly, then
- * remove the wals from {@link #walsById}. So no race with {@link 
#removePeer(String)}. The only
- * case need synchronized is {@link #cleanOldLogs(String, boolean, 
ReplicationSourceInterface)} and
- * {@link #preLogRoll(Path)}.</li>
- * <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are 
three methods which
- * modify it, {@link #removePeer(String)} ,
- * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
- * {@link ReplicationSourceManager#claimQueue(ServerName, String)}.
- * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} is 
called by
- * {@link ReplicationSourceInterface}. {@link #removePeer(String)} will 
terminate the
- * {@link ReplicationSourceInterface} firstly, then remove the wals from
- * {@link #walsByIdRecoveredQueues}. And
- * {@link ReplicationSourceManager#claimQueue(ServerName, String)} will add 
the wals to
- * {@link #walsByIdRecoveredQueues} firstly, then start up a {@link 
ReplicationSourceInterface}. So
- * there is no race here. For {@link 
ReplicationSourceManager#claimQueue(ServerName, String)} and
- * {@link #removePeer(String)}, there is already synchronized on {@link 
#oldsources}. So no need
- * synchronized on {@link #walsByIdRecoveredQueues}.</li>
  * <li>Need synchronized on {@link #latestPaths} to avoid the new open source 
miss new log.</li>
  * <li>Need synchronized on {@link #oldsources} to avoid adding recovered 
source for the
  * to-be-removed peer.</li>
@@ -144,15 +120,6 @@ public class ReplicationSourceManager {
   // All about stopping
   private final Server server;
 
-  // All logs we are currently tracking
-  // Index structure of the map is: queue_id->logPrefix/logGroup->logs
-  // For normal replication source, the peer id is same with the queue id
-  private final ConcurrentMap<String, Map<String, NavigableSet<String>>> 
walsById;
-  // Logs for recovered sources we are currently tracking
-  // the map is: queue_id->logPrefix/logGroup->logs
-  // For recovered source, the queue id's format is peer_id-servername-*
-  private final ConcurrentMap<String, Map<String, NavigableSet<String>>> 
walsByIdRecoveredQueues;
-
   private final SyncReplicationPeerMappingManager 
syncReplicationPeerMappingManager;
 
   private final Configuration conf;
@@ -212,8 +179,6 @@ public class ReplicationSourceManager {
     this.queueStorage = queueStorage;
     this.replicationPeers = replicationPeers;
     this.server = server;
-    this.walsById = new ConcurrentHashMap<>();
-    this.walsByIdRecoveredQueues = new ConcurrentHashMap<>();
     this.oldsources = new ArrayList<>();
     this.conf = conf;
     this.fs = fs;
@@ -322,7 +287,6 @@ public class ReplicationSourceManager {
       // Delete queue from storage and memory and queue id is same with peer 
id for normal
       // source
       deleteQueue(peerId);
-      this.walsById.remove(peerId);
     }
     ReplicationPeerConfig peerConfig = peer.getPeerConfig();
     if (peerConfig.isSyncReplication()) {
@@ -364,15 +328,10 @@ public class ReplicationSourceManager {
     // synchronized on latestPaths to avoid missing the new log
     synchronized (this.latestPaths) {
       this.sources.put(peerId, src);
-      Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
-      this.walsById.put(peerId, walsByGroup);
       // Add the latest wal to that source's queue
       if (!latestPaths.isEmpty()) {
         for (Map.Entry<String, Path> walPrefixAndPath : 
latestPaths.entrySet()) {
           Path walPath = walPrefixAndPath.getValue();
-          NavigableSet<String> wals = new TreeSet<>();
-          wals.add(walPath.getName());
-          walsByGroup.put(walPrefixAndPath.getKey(), wals);
           // Abort RS and throw exception to make add peer failed
           abortAndThrowIOExceptionWhenFail(
             () -> this.queueStorage.addWAL(server.getServerName(), peerId, 
walPath.getName()));
@@ -426,7 +385,10 @@ public class ReplicationSourceManager {
       // map from walsById since later we may fail to delete them from the 
replication queue
       // storage, and when we retry next time, we can not know the wal files 
that need to be deleted
       // from the replication queue storage.
-      walsById.get(peerId).forEach((k, v) -> wals.put(k, new TreeSet<>(v)));
+      this.queueStorage.getWALsInQueue(this.server.getServerName(), 
peerId).forEach(wal -> {
+        String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
+        wals.computeIfAbsent(walPrefix, p -> new TreeSet<>()).add(wal);
+      });
     }
     LOG.info("Startup replication source for " + src.getPeerId());
     src.startup();
@@ -435,15 +397,6 @@ public class ReplicationSourceManager {
         queueStorage.removeWAL(server.getServerName(), peerId, wal);
       }
     }
-    synchronized (walsById) {
-      Map<String, NavigableSet<String>> oldWals = walsById.get(peerId);
-      wals.forEach((k, v) -> {
-        NavigableSet<String> walsByGroup = oldWals.get(k);
-        if (walsByGroup != null) {
-          walsByGroup.removeAll(v);
-        }
-      });
-    }
     // synchronized on oldsources to avoid race with NodeFailoverWorker. Since 
NodeFailoverWorker is
     // a background task, we will delete the file from replication queue 
storage under the lock to
     // simplify the logic.
@@ -455,7 +408,6 @@ public class ReplicationSourceManager {
           oldSource.terminate(terminateMessage);
           oldSource.getSourceMetrics().clear();
           queueStorage.removeQueue(server.getServerName(), queueId);
-          walsByIdRecoveredQueues.remove(queueId);
           iter.remove();
         }
       }
@@ -468,7 +420,7 @@ public class ReplicationSourceManager {
    * replication queue storage and only to enqueue all logs to the new 
replication source
    * @param peerId the id of the replication peer
    */
-  public void refreshSources(String peerId) throws IOException {
+  public void refreshSources(String peerId) throws ReplicationException, 
IOException {
     String terminateMessage = "Peer " + peerId +
       " state or config changed. Will close the previous replication source 
and open a new one";
     ReplicationPeer peer = replicationPeers.getPeer(peerId);
@@ -481,9 +433,8 @@ public class ReplicationSourceManager {
         // Do not clear metrics
         toRemove.terminate(terminateMessage, null, false);
       }
-      for (NavigableSet<String> walsByGroup : walsById.get(peerId).values()) {
-        walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
-      }
+      this.queueStorage.getWALsInQueue(this.server.getServerName(), peerId)
+        .forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
     }
     LOG.info("Startup replication source for " + src.getPeerId());
     src.startup();
@@ -504,9 +455,8 @@ public class ReplicationSourceManager {
       for (String queueId : previousQueueIds) {
         ReplicationSourceInterface recoveredReplicationSource = 
createSource(queueId, peer);
         this.oldsources.add(recoveredReplicationSource);
-        for (SortedSet<String> walsByGroup : 
walsByIdRecoveredQueues.get(queueId).values()) {
-          walsByGroup.forEach(wal -> recoveredReplicationSource.enqueueLog(new 
Path(wal)));
-        }
+        this.queueStorage.getWALsInQueue(this.server.getServerName(), queueId)
+          .forEach(wal -> recoveredReplicationSource.enqueueLog(new 
Path(wal)));
         toStartup.add(recoveredReplicationSource);
       }
     }
@@ -526,7 +476,6 @@ public class ReplicationSourceManager {
     LOG.info("Done with the recovered queue {}", src.getQueueId());
     // Delete queue from storage and memory
     deleteQueue(src.getQueueId());
-    this.walsByIdRecoveredQueues.remove(src.getQueueId());
     return true;
   }
 
@@ -549,8 +498,6 @@ public class ReplicationSourceManager {
     this.sources.remove(src.getPeerId());
     // Delete queue from storage and memory
     deleteQueue(src.getQueueId());
-    this.walsById.remove(src.getQueueId());
-
   }
 
   /**
@@ -635,42 +582,19 @@ public class ReplicationSourceManager {
    * @param inclusive whether we should also remove the given log file
    * @param source the replication source
    */
-  void cleanOldLogs(String log, boolean inclusive, ReplicationSourceInterface 
source) {
-    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
-    if (source.isRecovered()) {
-      NavigableSet<String> wals = 
walsByIdRecoveredQueues.get(source.getQueueId()).get(logPrefix);
-      if (wals != null) {
-        NavigableSet<String> walsToRemove = wals.headSet(log, inclusive);
-        if (walsToRemove.isEmpty()) {
-          return;
-        }
-        cleanOldLogs(walsToRemove, source);
-        walsToRemove.clear();
-      }
-    } else {
-      NavigableSet<String> wals;
-      NavigableSet<String> walsToRemove;
-      // synchronized on walsById to avoid race with preLogRoll
-      synchronized (this.walsById) {
-        wals = walsById.get(source.getQueueId()).get(logPrefix);
-        if (wals == null) {
-          return;
-        }
-        walsToRemove = wals.headSet(log, inclusive);
-        if (walsToRemove.isEmpty()) {
-          return;
-        }
-        walsToRemove = new TreeSet<>(walsToRemove);
-      }
-      // cleanOldLogs may spend some time, especially for sync replication 
where we may want to
-      // remove remote wals as the remote cluster may have already been down, 
so we do it outside
-      // the lock to avoid block preLogRoll
-      cleanOldLogs(walsToRemove, source);
-      // now let's remove the files in the set
-      synchronized (this.walsById) {
-        wals.removeAll(walsToRemove);
-      }
+  void cleanOldLogs(String log, boolean inclusive,
+    ReplicationSourceInterface source) {
+    NavigableSet<String> walsToRemove;
+    synchronized (this.latestPaths) {
+      walsToRemove = getWalsToRemove(source.getQueueId(), log, inclusive);
+    }
+    if (walsToRemove.isEmpty()) {
+      return;
     }
+    // cleanOldLogs may spend some time, especially for sync replication where 
we may want to
+    // remove remote wals as the remote cluster may have already been down, so 
we do it outside
+    // the lock to avoid block preLogRoll
+    cleanOldLogs(walsToRemove, source);
   }
 
   private void removeRemoteWALs(String peerId, String remoteWALDir, 
Collection<String> wals)
@@ -750,37 +674,6 @@ public class ReplicationSourceManager {
         abortAndThrowIOExceptionWhenFail(
           () -> this.queueStorage.addWAL(server.getServerName(), 
source.getQueueId(), logName));
       }
-
-      // synchronized on walsById to avoid race with cleanOldLogs
-      synchronized (this.walsById) {
-        // Update walsById map
-        for (Map.Entry<String, Map<String, NavigableSet<String>>> entry : 
this.walsById
-          .entrySet()) {
-          String peerId = entry.getKey();
-          Map<String, NavigableSet<String>> walsByPrefix = entry.getValue();
-          boolean existingPrefix = false;
-          for (Map.Entry<String, NavigableSet<String>> walsEntry : 
walsByPrefix.entrySet()) {
-            SortedSet<String> wals = walsEntry.getValue();
-            if (this.sources.isEmpty()) {
-              // If there's no slaves, don't need to keep the old wals since
-              // we only consider the last one when a new slave comes in
-              wals.clear();
-            }
-            if (logPrefix.equals(walsEntry.getKey())) {
-              wals.add(logName);
-              existingPrefix = true;
-            }
-          }
-          if (!existingPrefix) {
-            // The new log belongs to a new group, add it into this peer
-            LOG.debug("Start tracking logs for wal group {} for peer {}", 
logPrefix, peerId);
-            NavigableSet<String> wals = new TreeSet<>();
-            wals.add(logName);
-            walsByPrefix.put(logPrefix, wals);
-          }
-        }
-      }
-
       // Add to latestPaths
       latestPaths.put(logPrefix, newLog);
     }
@@ -887,18 +780,6 @@ public class ReplicationSourceManager {
           return;
         }
       }
-      // track sources in walsByIdRecoveredQueues
-      Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
-      walsByIdRecoveredQueues.put(queueId, walsByGroup);
-      for (String wal : walsSet) {
-        String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
-        NavigableSet<String> wals = walsByGroup.get(walPrefix);
-        if (wals == null) {
-          wals = new TreeSet<>();
-          walsByGroup.put(walPrefix, wals);
-        }
-        wals.add(wal);
-      }
       oldsources.add(src);
       LOG.info("Added source for recovered queue {}", src.getQueueId());
       for (String wal : walsSet) {
@@ -926,7 +807,18 @@ public class ReplicationSourceManager {
    * Get a copy of the wals of the normal sources on this rs
    * @return a sorted set of wal names
    */
-  public Map<String, Map<String, NavigableSet<String>>> getWALs() {
+  public Map<String, Map<String, NavigableSet<String>>> getWALs()
+    throws ReplicationException {
+    Map<String, Map<String, NavigableSet<String>>> walsById = new HashMap<>();
+    for (ReplicationSourceInterface source : sources.values()) {
+      String queueId = source.getQueueId();
+      Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
+      walsById.put(queueId, walsByGroup);
+      for (String wal : 
this.queueStorage.getWALsInQueue(this.server.getServerName(), queueId)) {
+        String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
+        walsByGroup.computeIfAbsent(walPrefix, p -> new TreeSet<>()).add(wal);
+      }
+    }
     return Collections.unmodifiableMap(walsById);
   }
 
@@ -934,7 +826,18 @@ public class ReplicationSourceManager {
    * Get a copy of the wals of the recovered sources on this rs
    * @return a sorted set of wal names
    */
-  Map<String, Map<String, NavigableSet<String>>> getWalsByIdRecoveredQueues() {
+  Map<String, Map<String, NavigableSet<String>>> getWalsByIdRecoveredQueues()
+    throws ReplicationException {
+    Map<String, Map<String, NavigableSet<String>>> walsByIdRecoveredQueues = 
new HashMap<>();
+    for (ReplicationSourceInterface source : oldsources) {
+      String queueId = source.getQueueId();
+      Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
+      walsByIdRecoveredQueues.put(queueId, walsByGroup);
+      for (String wal : 
this.queueStorage.getWALsInQueue(this.server.getServerName(), queueId)) {
+        String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
+        walsByGroup.computeIfAbsent(walPrefix, p -> new TreeSet<>()).add(wal);
+      }
+    }
     return Collections.unmodifiableMap(walsByIdRecoveredQueues);
   }
 
@@ -1165,4 +1068,21 @@ public class ReplicationSourceManager {
   ReplicationQueueStorage getQueueStorage() {
     return queueStorage;
   }
+
+  private NavigableSet<String> getWalsToRemove(String queueId, String log, 
boolean inclusive) {
+    NavigableSet<String> walsToRemove = new TreeSet<>();
+    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
+    try {
+      this.queueStorage.getWALsInQueue(this.server.getServerName(), 
queueId).forEach(wal -> {
+        String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
+        if (walPrefix.equals(logPrefix)) {
+          walsToRemove.add(wal);
+        }
+      });
+    } catch (ReplicationException e) {
+      // Just log the exception here, as the recovered replication source will 
try to cleanup again.
+      LOG.warn("Failed to read wals in queue {}", queueId, e);
+    }
+    return walsToRemove.headSet(log, inclusive);
+  }
 }

Reply via email to