This is an automated email from the ASF dual-hosted git repository. zghao pushed a commit to branch HBASE-24666 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit c315e684214e6f18fdfd2156333b0396b7dc7d29 Author: Guanghao Zhang <zg...@apache.org> AuthorDate: Wed Jul 8 14:29:08 2020 +0800 HBASE-24682 Refactor ReplicationSource#addHFileRefs method: move it to ReplicationSourceManager (#2020) Signed-off-by: Wellington Chevreuil <wchevre...@apache.org> --- .../regionserver/ReplicationSource.java | 38 +---------------- .../regionserver/ReplicationSourceInterface.java | 14 ------- .../regionserver/ReplicationSourceManager.java | 48 +++++++++++++++++++++- .../hbase/replication/ReplicationSourceDummy.java | 9 +--- 4 files changed, 49 insertions(+), 60 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index bf8127f..2716ade 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -28,7 +28,6 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -38,6 +37,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Predicate; + import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.hadoop.conf.Configuration; @@ -48,21 +48,17 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableDescriptors; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; -import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -280,38 +276,6 @@ public class ReplicationSource implements ReplicationSourceInterface { } } - @Override - public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) - throws ReplicationException { - String peerId = replicationPeer.getId(); - Set<String> namespaces = replicationPeer.getNamespaces(); - Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs(); - if (tableCFMap != null) { // All peers with TableCFs - List<String> tableCfs = tableCFMap.get(tableName); - if (tableCFMap.containsKey(tableName) - && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) { - this.queueStorage.addHFileRefs(peerId, pairs); - metrics.incrSizeOfHFileRefsQueue(pairs.size()); - } else { - LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}", - tableName, Bytes.toString(family), peerId); - } - } else if (namespaces != null) { // Only for set NAMESPACES peers - if (namespaces.contains(tableName.getNamespaceAsString())) { - this.queueStorage.addHFileRefs(peerId, pairs); - metrics.incrSizeOfHFileRefsQueue(pairs.size()); - } else { - LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}", - tableName, Bytes.toString(family), peerId); - } - } else { - // user has explicitly not defined any table cfs for replication, means replicate all the - // data - this.queueStorage.addHFileRefs(peerId, pairs); - metrics.incrSizeOfHFileRefsQueue(pairs.size()); - } - } - private ReplicationEndpoint createReplicationEndpoint() throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException { RegionServerCoprocessorHost rsServerHost = null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 0bd90cf..33a413f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -29,12 +29,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; -import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.yetus.audience.InterfaceAudience; @@ -63,17 +60,6 @@ public interface ReplicationSourceInterface { void enqueueLog(Path log); /** - * Add hfile names to the queue to be replicated. - * @param tableName Name of the table these files belongs to - * @param family Name of the family these files belong to - * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which - * will be added in the queue for replication} - * @throws ReplicationException If failed to add hfile references - */ - void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) - throws ReplicationException; - - /** * Start the replication */ void startup(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 0940b5a..3869857 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider; @@ -173,6 +174,8 @@ public class ReplicationSourceManager implements ReplicationListener { private final long totalBufferLimit; private final MetricsReplicationGlobalSourceSource globalMetrics; + private final Map<String, MetricsSource> sourceMetrics = new HashMap<>(); + /** * Creates a replication manager and sets the watch on all the other registered region servers * @param queueStorage the interface for manipulating replication queues @@ -355,6 +358,7 @@ public class ReplicationSourceManager implements ReplicationListener { ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId); MetricsSource metrics = new MetricsSource(queueId); + sourceMetrics.put(queueId, metrics); // init replication source src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId, walFileLengthProvider, metrics); @@ -1139,7 +1143,49 @@ public class ReplicationSourceManager implements ReplicationListener { public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) throws IOException { for (ReplicationSourceInterface source : this.sources.values()) { - throwIOExceptionWhenFail(() -> source.addHFileRefs(tableName, family, pairs)); + throwIOExceptionWhenFail(() -> addHFileRefs(source.getPeerId(), tableName, family, pairs)); + } + } + + /** + * Add hfile names to the queue to be replicated. + * @param peerId the replication peer id + * @param tableName Name of the table these files belongs to + * @param family Name of the family these files belong to + * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which + * will be added in the queue for replication} + * @throws ReplicationException If failed to add hfile references + */ + private void addHFileRefs(String peerId, TableName tableName, byte[] family, + List<Pair<Path, Path>> pairs) throws ReplicationException { + // Only the normal replication source update here, its peerId is equals to queueId. + MetricsSource metrics = sourceMetrics.get(peerId); + ReplicationPeer replicationPeer = replicationPeers.getPeer(peerId); + Set<String> namespaces = replicationPeer.getNamespaces(); + Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs(); + if (tableCFMap != null) { // All peers with TableCFs + List<String> tableCfs = tableCFMap.get(tableName); + if (tableCFMap.containsKey(tableName) + && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) { + this.queueStorage.addHFileRefs(peerId, pairs); + metrics.incrSizeOfHFileRefsQueue(pairs.size()); + } else { + LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}", + tableName, Bytes.toString(family), peerId); + } + } else if (namespaces != null) { // Only for set NAMESPACES peers + if (namespaces.contains(tableName.getNamespaceAsString())) { + this.queueStorage.addHFileRefs(peerId, pairs); + metrics.incrSizeOfHFileRefsQueue(pairs.size()); + } else { + LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}", + tableName, Bytes.toString(family), peerId); + } + } else { + // user has explicitly not defined any table cfs for replication, means replicate all the + // data + this.queueStorage.addHFileRefs(peerId, pairs); + metrics.incrSizeOfHFileRefsQueue(pairs.size()); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index a361c44..781a1da 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -21,17 +21,16 @@ import java.io.IOException; import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; /** @@ -114,12 +113,6 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { } @Override - public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> files) - throws ReplicationException { - return; - } - - @Override public boolean isPeerEnabled() { return true; }