This is an automated email from the ASF dual-hosted git repository.

sunxin pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 0c061bc6164fbb49afc7146095a1cf8b80a997bc
Author: Guanghao Zhang <zg...@apache.org>
AuthorDate: Wed Jul 8 14:29:08 2020 +0800

    HBASE-24682 Refactor ReplicationSource#addHFileRefs method: move it to 
ReplicationSourceManager (#2020)
    
    Signed-off-by: Wellington Chevreuil <wchevre...@apache.org>
---
 .../regionserver/ReplicationSource.java            | 19 ++--------------
 .../regionserver/ReplicationSourceInterface.java   | 14 ------------
 .../regionserver/ReplicationSourceManager.java     | 26 +++++++++++++++++++++-
 .../hbase/replication/ReplicationSourceDummy.java  |  9 +-------
 4 files changed, 28 insertions(+), 40 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index d1268fa..a385ead 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Predicate;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -44,27 +45,24 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableDescriptors;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
 import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
 import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
 import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 
 /**
@@ -264,19 +262,6 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     return logQueue.getQueues();
   }
 
-  @Override
-  public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, 
Path>> pairs)
-      throws ReplicationException {
-    String peerId = replicationPeer.getId();
-    if (replicationPeer.getPeerConfig().needToReplicate(tableName, family)) {
-      this.queueStorage.addHFileRefs(peerId, pairs);
-      metrics.incrSizeOfHFileRefsQueue(pairs.size());
-    } else {
-      LOG.debug("HFiles will not be replicated belonging to the table {} 
family {} to peer id {}",
-        tableName, Bytes.toString(family), peerId);
-    }
-  }
-
   private ReplicationEndpoint createReplicationEndpoint()
       throws InstantiationException, IllegalAccessException, 
ClassNotFoundException, IOException {
     RegionServerCoprocessorHost rsServerHost = null;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 27e4b79..352cdd3 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -28,12 +28,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -61,17 +58,6 @@ public interface ReplicationSourceInterface {
   void enqueueLog(Path log);
 
   /**
-   * Add hfile names to the queue to be replicated.
-   * @param tableName Name of the table these files belongs to
-   * @param family Name of the family these files belong to
-   * @param pairs list of pairs of { HFile location in staging dir, HFile path 
in region dir which
-   *          will be added in the queue for replication}
-   * @throws ReplicationException If failed to add hfile references
-   */
-  void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> 
pairs)
-      throws ReplicationException;
-
-  /**
    * Start the replication
    */
   ReplicationSourceInterface startup();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 73efcfe..ad7c033 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -61,6 +61,7 @@ import 
org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
@@ -1050,7 +1051,30 @@ public class ReplicationSourceManager {
   public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, 
Path>> pairs)
       throws IOException {
     for (ReplicationSourceInterface source : this.sources.values()) {
-      throwIOExceptionWhenFail(() -> source.addHFileRefs(tableName, family, 
pairs));
+      throwIOExceptionWhenFail(() -> addHFileRefs(source, tableName, family, 
pairs));
+    }
+  }
+
+  /**
+   * Add hfile names to the queue to be replicated.
+   * @param source the replication peer source
+   * @param tableName Name of the table these files belongs to
+   * @param family Name of the family these files belong to
+   * @param pairs list of pairs of { HFile location in staging dir, HFile path 
in region dir which
+   *          will be added in the queue for replication}
+   * @throws ReplicationException If failed to add hfile references
+   */
+  private void addHFileRefs(ReplicationSourceInterface source, TableName 
tableName, byte[] family,
+    List<Pair<Path, Path>> pairs) throws ReplicationException {
+    String peerId = source.getPeerId();
+    // Only the normal replication source update here, its peerId is equals to 
queueId.
+    ReplicationPeer replicationPeer = replicationPeers.getPeer(peerId);
+    if (replicationPeer.getPeerConfig().needToReplicate(tableName, family)) {
+      this.queueStorage.addHFileRefs(peerId, pairs);
+      source.getSourceMetrics().incrSizeOfHFileRefsQueue(pairs.size());
+    } else {
+      LOG.debug("HFiles will not be replicated belonging to the table {} 
family {} to peer id {}",
+        tableName, Bytes.toString(family), peerId);
     }
   }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index cab01d6..4f656b1 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -21,17 +21,16 @@ import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
 import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
 import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
 import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 
 /**
@@ -115,12 +114,6 @@ public class ReplicationSourceDummy implements 
ReplicationSourceInterface {
   }
 
   @Override
-  public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, 
Path>> files)
-      throws ReplicationException {
-    return;
-  }
-
-  @Override
   public boolean isPeerEnabled() {
     return true;
   }

Reply via email to