This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit c315e684214e6f18fdfd2156333b0396b7dc7d29
Author: Guanghao Zhang <zg...@apache.org>
AuthorDate: Wed Jul 8 14:29:08 2020 +0800

    HBASE-24682 Refactor ReplicationSource#addHFileRefs method: move it to 
ReplicationSourceManager (#2020)
    
    Signed-off-by: Wellington Chevreuil <wchevre...@apache.org>
---
 .../regionserver/ReplicationSource.java            | 38 +----------------
 .../regionserver/ReplicationSourceInterface.java   | 14 -------
 .../regionserver/ReplicationSourceManager.java     | 48 +++++++++++++++++++++-
 .../hbase/replication/ReplicationSourceDummy.java  |  9 +---
 4 files changed, 49 insertions(+), 60 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index bf8127f..2716ade 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -28,7 +28,6 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -38,6 +37,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Predicate;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.hadoop.conf.Configuration;
@@ -48,21 +48,17 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableDescriptors;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
 import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
 import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
 import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -280,38 +276,6 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     }
   }
 
-  @Override
-  public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, 
Path>> pairs)
-      throws ReplicationException {
-    String peerId = replicationPeer.getId();
-    Set<String> namespaces = replicationPeer.getNamespaces();
-    Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
-    if (tableCFMap != null) { // All peers with TableCFs
-      List<String> tableCfs = tableCFMap.get(tableName);
-      if (tableCFMap.containsKey(tableName)
-          && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
-        this.queueStorage.addHFileRefs(peerId, pairs);
-        metrics.incrSizeOfHFileRefsQueue(pairs.size());
-      } else {
-        LOG.debug("HFiles will not be replicated belonging to the table {} 
family {} to peer id {}",
-            tableName, Bytes.toString(family), peerId);
-      }
-    } else if (namespaces != null) { // Only for set NAMESPACES peers
-      if (namespaces.contains(tableName.getNamespaceAsString())) {
-        this.queueStorage.addHFileRefs(peerId, pairs);
-        metrics.incrSizeOfHFileRefsQueue(pairs.size());
-      } else {
-        LOG.debug("HFiles will not be replicated belonging to the table {} 
family {} to peer id {}",
-            tableName, Bytes.toString(family), peerId);
-      }
-    } else {
-      // user has explicitly not defined any table cfs for replication, means 
replicate all the
-      // data
-      this.queueStorage.addHFileRefs(peerId, pairs);
-      metrics.incrSizeOfHFileRefsQueue(pairs.size());
-    }
-  }
-
   private ReplicationEndpoint createReplicationEndpoint()
       throws InstantiationException, IllegalAccessException, 
ClassNotFoundException, IOException {
     RegionServerCoprocessorHost rsServerHost = null;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 0bd90cf..33a413f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -29,12 +29,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -63,17 +60,6 @@ public interface ReplicationSourceInterface {
   void enqueueLog(Path log);
 
   /**
-   * Add hfile names to the queue to be replicated.
-   * @param tableName Name of the table these files belongs to
-   * @param family Name of the family these files belong to
-   * @param pairs list of pairs of { HFile location in staging dir, HFile path 
in region dir which
-   *          will be added in the queue for replication}
-   * @throws ReplicationException If failed to add hfile references
-   */
-  void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> 
pairs)
-      throws ReplicationException;
-
-  /**
    * Start the replication
    */
   void startup();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 0940b5a..3869857 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -61,6 +61,7 @@ import 
org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
@@ -173,6 +174,8 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   private final long totalBufferLimit;
   private final MetricsReplicationGlobalSourceSource globalMetrics;
 
+  private final Map<String, MetricsSource> sourceMetrics = new HashMap<>();
+
   /**
    * Creates a replication manager and sets the watch on all the other 
registered region servers
    * @param queueStorage the interface for manipulating replication queues
@@ -355,6 +358,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, 
queueId);
 
     MetricsSource metrics = new MetricsSource(queueId);
+    sourceMetrics.put(queueId, metrics);
     // init replication source
     src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, 
clusterId,
       walFileLengthProvider, metrics);
@@ -1139,7 +1143,49 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, 
Path>> pairs)
       throws IOException {
     for (ReplicationSourceInterface source : this.sources.values()) {
-      throwIOExceptionWhenFail(() -> source.addHFileRefs(tableName, family, 
pairs));
+      throwIOExceptionWhenFail(() -> addHFileRefs(source.getPeerId(), 
tableName, family, pairs));
+    }
+  }
+
+  /**
+   * Add hfile names to the queue to be replicated.
+   * @param peerId the replication peer id
+   * @param tableName Name of the table these files belongs to
+   * @param family Name of the family these files belong to
+   * @param pairs list of pairs of { HFile location in staging dir, HFile path 
in region dir which
+   *          will be added in the queue for replication}
+   * @throws ReplicationException If failed to add hfile references
+   */
+  private void addHFileRefs(String peerId, TableName tableName, byte[] family,
+    List<Pair<Path, Path>> pairs) throws ReplicationException {
+    // Only the normal replication source update here, its peerId is equals to 
queueId.
+    MetricsSource metrics = sourceMetrics.get(peerId);
+    ReplicationPeer replicationPeer = replicationPeers.getPeer(peerId);
+    Set<String> namespaces = replicationPeer.getNamespaces();
+    Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
+    if (tableCFMap != null) { // All peers with TableCFs
+      List<String> tableCfs = tableCFMap.get(tableName);
+      if (tableCFMap.containsKey(tableName)
+        && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
+        this.queueStorage.addHFileRefs(peerId, pairs);
+        metrics.incrSizeOfHFileRefsQueue(pairs.size());
+      } else {
+        LOG.debug("HFiles will not be replicated belonging to the table {} 
family {} to peer id {}",
+          tableName, Bytes.toString(family), peerId);
+      }
+    } else if (namespaces != null) { // Only for set NAMESPACES peers
+      if (namespaces.contains(tableName.getNamespaceAsString())) {
+        this.queueStorage.addHFileRefs(peerId, pairs);
+        metrics.incrSizeOfHFileRefsQueue(pairs.size());
+      } else {
+        LOG.debug("HFiles will not be replicated belonging to the table {} 
family {} to peer id {}",
+          tableName, Bytes.toString(family), peerId);
+      }
+    } else {
+      // user has explicitly not defined any table cfs for replication, means 
replicate all the
+      // data
+      this.queueStorage.addHFileRefs(peerId, pairs);
+      metrics.incrSizeOfHFileRefsQueue(pairs.size());
     }
   }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index a361c44..781a1da 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -21,17 +21,16 @@ import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
 import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
 import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
 import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 
 /**
@@ -114,12 +113,6 @@ public class ReplicationSourceDummy implements 
ReplicationSourceInterface {
   }
 
   @Override
-  public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, 
Path>> files)
-      throws ReplicationException {
-    return;
-  }
-
-  @Override
   public boolean isPeerEnabled() {
     return true;
   }

Reply via email to