This is an automated email from the ASF dual-hosted git repository. sunxin pushed a commit to branch HBASE-24666 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit a62a4b1a3e3fa0def1ec0c211cf703f954f9bbe2 Author: Guanghao Zhang <zg...@apache.org> AuthorDate: Wed Jul 8 14:29:08 2020 +0800 HBASE-24682 Refactor ReplicationSource#addHFileRefs method: move it to ReplicationSourceManager (#2020) Signed-off-by: Wellington Chevreuil <wchevre...@apache.org> --- .../regionserver/ReplicationSource.java | 38 +---------------- .../regionserver/ReplicationSourceInterface.java | 14 ------- .../regionserver/ReplicationSourceManager.java | 47 +++++++++++++++++++++- .../hbase/replication/ReplicationSourceDummy.java | 9 +---- 4 files changed, 48 insertions(+), 60 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 3272cf1..fdf7d89 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -26,7 +26,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -35,6 +34,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Predicate; + import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -44,21 +44,17 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableDescriptors; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; -import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -260,38 +256,6 @@ public class ReplicationSource implements ReplicationSourceInterface { } } - @Override - public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) - throws ReplicationException { - String peerId = replicationPeer.getId(); - Set<String> namespaces = replicationPeer.getNamespaces(); - Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs(); - if (tableCFMap != null) { // All peers with TableCFs - List<String> tableCfs = tableCFMap.get(tableName); - if (tableCFMap.containsKey(tableName) - && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) { - this.queueStorage.addHFileRefs(peerId, pairs); - metrics.incrSizeOfHFileRefsQueue(pairs.size()); - } else { - LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}", - tableName, Bytes.toString(family), peerId); - } - } else if (namespaces != null) { // Only for set NAMESPACES peers - if (namespaces.contains(tableName.getNamespaceAsString())) { - this.queueStorage.addHFileRefs(peerId, pairs); - metrics.incrSizeOfHFileRefsQueue(pairs.size()); - } else { - LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}", - tableName, Bytes.toString(family), peerId); - } - } else { - // user has explicitly not defined any table cfs for replication, means replicate all the - // data - this.queueStorage.addHFileRefs(peerId, pairs); - metrics.incrSizeOfHFileRefsQueue(pairs.size()); - } - } - private ReplicationEndpoint createReplicationEndpoint() throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException { RegionServerCoprocessorHost rsServerHost = null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 27e4b79..352cdd3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -28,12 +28,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; -import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.yetus.audience.InterfaceAudience; @@ -61,17 +58,6 @@ public interface ReplicationSourceInterface { void enqueueLog(Path log); /** - * Add hfile names to the queue to be replicated. - * @param tableName Name of the table these files belongs to - * @param family Name of the family these files belong to - * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which - * will be added in the queue for replication} - * @throws ReplicationException If failed to add hfile references - */ - void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) - throws ReplicationException; - - /** * Start the replication */ ReplicationSourceInterface startup(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index a276b78..21979bb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider; @@ -198,6 +199,8 @@ public class ReplicationSourceManager implements ReplicationListener { */ AtomicReference<ReplicationSourceInterface> catalogReplicationSource = new AtomicReference<>(); + private final Map<String, MetricsSource> sourceMetrics = new HashMap<>(); + /** * Creates a replication manager and sets the watch on all the other registered region servers * @param queueStorage the interface for manipulating replication queues @@ -1154,7 +1157,49 @@ public class ReplicationSourceManager implements ReplicationListener { public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) throws IOException { for (ReplicationSourceInterface source : this.sources.values()) { - throwIOExceptionWhenFail(() -> source.addHFileRefs(tableName, family, pairs)); + throwIOExceptionWhenFail(() -> addHFileRefs(source.getPeerId(), tableName, family, pairs)); + } + } + + /** + * Add hfile names to the queue to be replicated. + * @param peerId the replication peer id + * @param tableName Name of the table these files belongs to + * @param family Name of the family these files belong to + * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which + * will be added in the queue for replication} + * @throws ReplicationException If failed to add hfile references + */ + private void addHFileRefs(String peerId, TableName tableName, byte[] family, + List<Pair<Path, Path>> pairs) throws ReplicationException { + // Only the normal replication source update here, its peerId is equals to queueId. + MetricsSource metrics = sourceMetrics.get(peerId); + ReplicationPeer replicationPeer = replicationPeers.getPeer(peerId); + Set<String> namespaces = replicationPeer.getNamespaces(); + Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs(); + if (tableCFMap != null) { // All peers with TableCFs + List<String> tableCfs = tableCFMap.get(tableName); + if (tableCFMap.containsKey(tableName) + && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) { + this.queueStorage.addHFileRefs(peerId, pairs); + metrics.incrSizeOfHFileRefsQueue(pairs.size()); + } else { + LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}", + tableName, Bytes.toString(family), peerId); + } + } else if (namespaces != null) { // Only for set NAMESPACES peers + if (namespaces.contains(tableName.getNamespaceAsString())) { + this.queueStorage.addHFileRefs(peerId, pairs); + metrics.incrSizeOfHFileRefsQueue(pairs.size()); + } else { + LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}", + tableName, Bytes.toString(family), peerId); + } + } else { + // user has explicitly not defined any table cfs for replication, means replicate all the + // data + this.queueStorage.addHFileRefs(peerId, pairs); + metrics.incrSizeOfHFileRefsQueue(pairs.size()); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index cab01d6..4f656b1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -21,17 +21,16 @@ import java.io.IOException; import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; /** @@ -115,12 +114,6 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { } @Override - public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> files) - throws ReplicationException { - return; - } - - @Override public boolean isPeerEnabled() { return true; }