Repository: hbase Updated Branches: refs/heads/master 15da74cce -> dd6f4525e
HBASE-20148 Make serial replication as a option for a peer instead of a table Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/dd6f4525 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/dd6f4525 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/dd6f4525 Branch: refs/heads/master Commit: dd6f4525e7a9eaceb6ed4f97059b2dd3c532d323 Parents: 15da74c Author: zhangduo <zhang...@apache.org> Authored: Fri Mar 9 15:00:59 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Sat Mar 10 09:04:44 2018 +0800 ---------------------------------------------------------------------- .../apache/hadoop/hbase/HTableDescriptor.java | 8 ----- .../hadoop/hbase/client/TableDescriptor.java | 20 +++++++----- .../hbase/client/TableDescriptorBuilder.java | 9 ------ .../replication/ReplicationPeerConfigUtil.java | 5 +++ .../replication/ReplicationPeerConfig.java | 32 +++++++++++++++----- .../ReplicationPeerConfigBuilder.java | 12 ++++++++ .../org/apache/hadoop/hbase/HConstants.java | 6 ---- .../src/main/protobuf/Replication.proto | 1 + .../hbase/replication/ReplicationUtils.java | 3 ++ .../master/assignment/RegionStateStore.java | 14 ++++----- .../hbase/replication/ScopeWALEntryFilter.java | 32 ++++++++++---------- .../regionserver/ReplicationSource.java | 4 +++ .../ReplicationSourceWALActionListener.java | 10 +----- .../ReplicationSourceWALReader.java | 6 ++-- .../regionserver/SerialReplicationChecker.java | 2 +- .../org/apache/hadoop/hbase/wal/WALKeyImpl.java | 8 ----- .../TestReplicationWALEntryFilters.java | 15 ++++++--- .../replication/TestSerialReplication.java | 9 +++--- 18 files changed, 104 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java index 3652d10..db8870d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -537,14 +537,6 @@ public class HTableDescriptor implements TableDescriptor, Comparable<HTableDescr } /** - * Return true if there are at least one cf whose replication scope is serial. - */ - @Override - public boolean hasSerialReplicationScope() { - return delegatee.hasSerialReplicationScope(); - } - - /** * Returns the configured replicas per region */ @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java index 3505175..1ec61a2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java @@ -24,7 +24,7 @@ import java.util.Comparator; import java.util.Iterator; import java.util.Map; import java.util.Set; - +import java.util.stream.Stream; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.util.Bytes; @@ -232,11 +232,6 @@ public interface TableDescriptor { boolean hasRegionMemStoreReplication(); /** - * @return true if there are at least one cf whose replication scope is serial. - */ - boolean hasSerialReplicationScope(); - - /** * Check if the compaction enable flag of the table is true. If flag is false * then no minor/major compactions will be done in real. * @@ -275,6 +270,16 @@ public interface TableDescriptor { boolean isReadOnly(); /** + * Check if any of the table's cfs' replication scope are set to + * {@link HConstants#REPLICATION_SCOPE_GLOBAL}. + * @return {@code true} if we have, otherwise {@code false}. + */ + default boolean hasGlobalReplicationScope() { + return Stream.of(getColumnFamilies()) + .anyMatch(cf -> cf.getScope() == HConstants.REPLICATION_SCOPE_GLOBAL); + } + + /** * Check if the table's cfs' replication scope matched with the replication state * @param enabled replication state * @return true if matched, otherwise false @@ -284,8 +289,7 @@ public interface TableDescriptor { boolean hasDisabled = false; for (ColumnFamilyDescriptor cf : getColumnFamilies()) { - if (cf.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL && - cf.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) { + if (cf.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) { hasDisabled = true; } else { hasEnabled = true; http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java index 0855f87..c1db64b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java @@ -1054,15 +1054,6 @@ public class TableDescriptorBuilder { } /** - * Return true if there are at least one cf whose replication scope is serial. - */ - @Override - public boolean hasSerialReplicationScope() { - return families.values().stream() - .anyMatch(column -> column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL); - } - - /** * Returns the configured replicas per region */ @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java index a234a9b..b1c1713 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java @@ -303,6 +303,10 @@ public final class ReplicationPeerConfigUtil { builder.setReplicateAllUserTables(peer.getReplicateAll()); } + if (peer.hasSerial()) { + builder.setSerial(peer.getSerial()); + } + Map<TableName, List<String>> excludeTableCFsMap = convert2Map(peer.getExcludeTableCfsList() .toArray(new ReplicationProtos.TableCF[peer.getExcludeTableCfsCount()])); if (excludeTableCFsMap != null) { @@ -357,6 +361,7 @@ public final class ReplicationPeerConfigUtil { builder.setBandwidth(peerConfig.getBandwidth()); builder.setReplicateAll(peerConfig.replicateAllUserTables()); + builder.setSerial(peerConfig.isSerial()); ReplicationProtos.TableCF[] excludeTableCFs = convert(peerConfig.getExcludeTableCFsMap()); if (excludeTableCFs != null) { http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java index bf8d030..e0d9a4c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java @@ -46,6 +46,7 @@ public class ReplicationPeerConfig { private Map<TableName, ? extends Collection<String>> excludeTableCFsMap = null; private Set<String> excludeNamespaces = null; private long bandwidth = 0; + private final boolean serial; private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder) { this.clusterKey = builder.clusterKey; @@ -64,6 +65,7 @@ public class ReplicationPeerConfig { builder.excludeNamespaces != null ? Collections.unmodifiableSet(builder.excludeNamespaces) : null; this.bandwidth = builder.bandwidth; + this.serial = builder.serial; } private Map<TableName, List<String>> @@ -82,6 +84,7 @@ public class ReplicationPeerConfig { public ReplicationPeerConfig() { this.peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR); this.configuration = new HashMap<>(0); + this.serial = false; } /** @@ -214,16 +217,20 @@ public class ReplicationPeerConfig { return new ReplicationPeerConfigBuilderImpl(); } + public boolean isSerial() { + return serial; + } + public static ReplicationPeerConfigBuilder newBuilder(ReplicationPeerConfig peerConfig) { ReplicationPeerConfigBuilderImpl builder = new ReplicationPeerConfigBuilderImpl(); builder.setClusterKey(peerConfig.getClusterKey()) - .setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl()) - .putAllPeerData(peerConfig.getPeerData()).putAllConfiguration(peerConfig.getConfiguration()) - .setTableCFsMap(peerConfig.getTableCFsMap()).setNamespaces(peerConfig.getNamespaces()) - .setReplicateAllUserTables(peerConfig.replicateAllUserTables()) - .setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap()) - .setExcludeNamespaces(peerConfig.getExcludeNamespaces()) - .setBandwidth(peerConfig.getBandwidth()); + .setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl()) + .putAllPeerData(peerConfig.getPeerData()).putAllConfiguration(peerConfig.getConfiguration()) + .setTableCFsMap(peerConfig.getTableCFsMap()).setNamespaces(peerConfig.getNamespaces()) + .setReplicateAllUserTables(peerConfig.replicateAllUserTables()) + .setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap()) + .setExcludeNamespaces(peerConfig.getExcludeNamespaces()) + .setBandwidth(peerConfig.getBandwidth()).setSerial(peerConfig.isSerial()); return builder; } @@ -250,6 +257,8 @@ public class ReplicationPeerConfig { private long bandwidth = 0; + private boolean serial = false; + @Override public ReplicationPeerConfigBuilder setClusterKey(String clusterKey) { this.clusterKey = clusterKey; @@ -313,6 +322,12 @@ public class ReplicationPeerConfig { } @Override + public ReplicationPeerConfigBuilder setSerial(boolean serial) { + this.serial = serial; + return this; + } + + @Override public ReplicationPeerConfig build() { // It would be nice to validate the configuration, but we have to work with "old" data // from ZK which makes it much more difficult. @@ -340,7 +355,8 @@ public class ReplicationPeerConfig { builder.append("tableCFs=").append(tableCFsMap.toString()).append(","); } } - builder.append("bandwidth=").append(bandwidth); + builder.append("bandwidth=").append(bandwidth).append(","); + builder.append("serial=").append(serial); return builder.toString(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java index 0b2f2e2..4c531c5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java @@ -138,6 +138,18 @@ public interface ReplicationPeerConfigBuilder { ReplicationPeerConfigBuilder setExcludeNamespaces(Set<String> namespaces); /** + * <p> + * Sets whether we should preserve order when replicating, i.e, serial replication. + * </p> + * <p> + * Default {@code false}. + * </p> + * @param serial {@code true} means preserve order, otherwise {@code false}. + * @return {@code this} + */ + ReplicationPeerConfigBuilder setSerial(boolean serial); + + /** * Builds the configuration object from the current state of {@code this}. * @return A {@link ReplicationPeerConfig} instance. */ http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 3dd0ac8..0039a56 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -647,12 +647,6 @@ public final class HConstants { public static final int REPLICATION_SCOPE_GLOBAL = 1; /** - * Scope tag for serially scoped data - * This data will be replicated to all peers by the order of sequence id. - */ - public static final int REPLICATION_SCOPE_SERIAL = 2; - - /** * Default cluster ID, cannot be used to identify a cluster so a key with * this value means it wasn't meant for replication. */ http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-protocol-shaded/src/main/protobuf/Replication.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/protobuf/Replication.proto b/hbase-protocol-shaded/src/main/protobuf/Replication.proto index 9f7b4c2..557b87c 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Replication.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Replication.proto @@ -48,6 +48,7 @@ message ReplicationPeer { optional bool replicate_all = 8; repeated TableCF exclude_table_cfs = 9; repeated bytes exclude_namespaces = 10; + optional bool serial = 11; } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java index 11507aa..2a6870a 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java @@ -115,6 +115,9 @@ public final class ReplicationUtils { if (rpc1.replicateAllUserTables() != rpc2.replicateAllUserTables()) { return false; } + if (rpc1.isSerial() != rpc2.isSerial()) { + return false; + } if (rpc1.replicateAllUserTables()) { return isNamespacesEqual(rpc1.getExcludeNamespaces(), rpc2.getExcludeNamespaces()) && isTableCFsEqual(rpc1.getExcludeTableCFsMap(), rpc2.getExcludeTableCFsMap()); http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java index 1ffc31f..c8017202 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java @@ -165,7 +165,7 @@ public class RegionStateStore { MetaTableAccessor.addLocation(put, regionLocation, openSeqNum, replicaId); // only update replication barrier for default replica if (regionInfo.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID && - hasSerialReplicationScope(regionInfo.getTable())) { + hasGlobalReplicationScope(regionInfo.getTable())) { MetaTableAccessor.addReplicationBarrier(put, openSeqNum); } info.append(", openSeqNum=").append(openSeqNum); @@ -224,7 +224,7 @@ public class RegionStateStore { ServerName serverName) throws IOException { TableDescriptor htd = getTableDescriptor(parent.getTable()); long parentOpenSeqNum = HConstants.NO_SEQNUM; - if (htd.hasSerialReplicationScope()) { + if (htd.hasGlobalReplicationScope()) { parentOpenSeqNum = getOpenSeqNumForParentRegion(parent); } MetaTableAccessor.splitRegion(master.getConnection(), parent, parentOpenSeqNum, hriA, hriB, @@ -239,7 +239,7 @@ public class RegionStateStore { TableDescriptor htd = getTableDescriptor(child.getTable()); long regionAOpenSeqNum = -1L; long regionBOpenSeqNum = -1L; - if (htd.hasSerialReplicationScope()) { + if (htd.hasGlobalReplicationScope()) { regionAOpenSeqNum = getOpenSeqNumForParentRegion(hriA); regionBOpenSeqNum = getOpenSeqNumForParentRegion(hriB); } @@ -261,12 +261,12 @@ public class RegionStateStore { // ========================================================================== // Table Descriptors helpers // ========================================================================== - private boolean hasSerialReplicationScope(TableName tableName) throws IOException { - return hasSerialReplicationScope(getTableDescriptor(tableName)); + private boolean hasGlobalReplicationScope(TableName tableName) throws IOException { + return hasGlobalReplicationScope(getTableDescriptor(tableName)); } - private boolean hasSerialReplicationScope(TableDescriptor htd) { - return htd != null ? htd.hasSerialReplicationScope() : false; + private boolean hasGlobalReplicationScope(TableDescriptor htd) { + return htd != null ? htd.hasGlobalReplicationScope() : false; } private int getRegionReplication(TableDescriptor htd) { http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java index 6a2fbcf..f8722eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java @@ -37,31 +37,31 @@ public class ScopeWALEntryFilter implements WALEntryFilter, WALCellFilter { @Override public Entry filter(Entry entry) { - NavigableMap<byte[], Integer> scopes = entry.getKey().getReplicationScopes(); - if (scopes == null || scopes.isEmpty()) { - return null; - } + // Do not filter out an entire entry by replication scopes. As now we support serial + // replication, the sequence id of a marker is also needed by upper layer. We will filter out + // all the cells in the filterCell method below if the replication scopes is null or empty. return entry; } + private boolean hasGlobalScope(NavigableMap<byte[], Integer> scopes, byte[] family) { + Integer scope = scopes.get(family); + return scope != null && scope.intValue() == HConstants.REPLICATION_SCOPE_GLOBAL; + } @Override public Cell filterCell(Entry entry, Cell cell) { - final NavigableMap<byte[], Integer> scopes = entry.getKey().getReplicationScopes(); - // The scope will be null or empty if - // there's nothing to replicate in that WALEdit - byte[] fam = CellUtil.cloneFamily(cell); + NavigableMap<byte[], Integer> scopes = entry.getKey().getReplicationScopes(); + if (scopes == null || scopes.isEmpty()) { + return null; + } + byte[] family = CellUtil.cloneFamily(cell); if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) { - cell = bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() { + return bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() { @Override - public boolean apply(byte[] fam) { - return !scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL; + public boolean apply(byte[] family) { + return !hasGlobalScope(scopes, family); } }); - } else { - if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) { - return null; - } } - return cell; + return hasGlobalScope(scopes, family) ? cell : null; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 86e7f98..2f9cd56 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -382,6 +382,10 @@ public class ReplicationSource implements ReplicationSourceInterface { return replicationPeer.isPeerEnabled(); } + public boolean isSerial() { + return replicationPeer.getPeerConfig().isSerial(); + } + private void initialize() { int sleepMultiplier = 1; while (this.isSourceActive()) { http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java index 95fc6a0..27b25c4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java @@ -72,18 +72,10 @@ class ReplicationSourceWALActionListener implements WALActionsListener { if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) { return; } - WALKeyImpl keyImpl = (WALKeyImpl) logKey; - // For serial replication we need to count all the sequence ids even for markers, so here we - // always need to retain the replication scopes to let the replication wal reader to know that - // we need serial replication. The ScopeWALEntryFilter will help filtering out the cell for - // WALEdit.METAFAMILY. - if (keyImpl.hasSerialReplicationScope()) { - return; - } // For replay, or if all the cells are markers, do not need to store replication scope. if (logEdit.isReplay() || logEdit.getCells().stream().allMatch(c -> CellUtil.matchingFamily(c, WALEdit.METAFAMILY))) { - keyImpl.clearReplicationScope(); + ((WALKeyImpl) logKey).clearReplicationScope(); } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index ad3baaf..da92a09 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -186,9 +186,9 @@ public class ReplicationSourceWALReader extends Thread { new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); do { Entry entry = entryStream.peek(); - boolean hasSerialReplicationScope = entry.getKey().hasSerialReplicationScope(); + boolean isSerial = source.isSerial(); boolean doFiltering = true; - if (hasSerialReplicationScope) { + if (isSerial) { if (firstCellInEntryBeforeFiltering == null) { assert !entry.getEdit().isEmpty() : "should not write empty edits"; // Used to locate the region record in meta table. In WAL we only have the table name and @@ -208,7 +208,7 @@ public class ReplicationSourceWALReader extends Thread { entry = filterEntry(entry); } if (entry != null) { - if (hasSerialReplicationScope) { + if (isSerial) { if (!serialReplicationChecker.canPush(entry, firstCellInEntryBeforeFiltering)) { if (batch.getLastWalPosition() > positionBefore) { // we have something that can push, break http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java index 9276359..b775d25 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java @@ -266,7 +266,7 @@ class SerialReplicationChecker { throws IOException, InterruptedException { byte[] row = CellUtil.cloneRow(firstCellInEdit); while (!canPush(entry, row)) { - LOG.debug("Can not push{}, wait", entry); + LOG.debug("Can not push {}, wait", entry); Thread.sleep(waitTimeMs); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java index ac23d1d..8828239 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java @@ -419,14 +419,6 @@ public class WALKeyImpl implements WALKey { setReplicationScope(null); } - public boolean hasSerialReplicationScope() { - if (replicationScope == null || replicationScope.isEmpty()) { - return false; - } - return replicationScope.values().stream() - .anyMatch(scope -> scope.intValue() == HConstants.REPLICATION_SCOPE_SERIAL); - } - /** * Marks that the cluster with the given clusterId has consumed the change */ http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java index 67a2551..f2c5e50 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.replication; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -32,9 +33,9 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparatorImpl; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; @@ -48,7 +49,7 @@ import org.junit.experimental.categories.Category; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; -@Category({ReplicationTests.class, SmallTests.class}) +@Category({ ReplicationTests.class, SmallTests.class }) public class TestReplicationWALEntryFilters { @ClassRule @@ -65,7 +66,8 @@ public class TestReplicationWALEntryFilters { SystemTableWALEntryFilter filter = new SystemTableWALEntryFilter(); // meta - WALKeyImpl key1 = new WALKeyImpl(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), + WALKeyImpl key1 = + new WALKeyImpl(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), TableName.META_TABLE_NAME, System.currentTimeMillis()); Entry metaEntry = new Entry(key1, null); @@ -96,12 +98,15 @@ public class TestReplicationWALEntryFilters { Entry userEntryEmpty = createEntry(null); // no scopes - assertEquals(null, filter.filter(userEntry)); + // now we will not filter out entries without a replication scope since serial replication still + // need the sequence id, but the cells will all be filtered out. + assertTrue(filter.filter(userEntry).getEdit().isEmpty()); // empty scopes + // ditto TreeMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); userEntry = createEntry(scopes, a, b); - assertEquals(null, filter.filter(userEntry)); + assertTrue(filter.filter(userEntry).getEdit().isEmpty()); // different scope scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java index bf6c0c8..f8efcf0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java @@ -156,7 +156,8 @@ public class TestSerialReplication { // add in disable state, so later when enabling it all sources will start push together. UTIL.getAdmin().addReplicationPeer(PEER_ID, ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") - .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build(), + .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).setSerial(true) + .build(), false); } @@ -234,7 +235,7 @@ public class TestSerialReplication { TableName tableName = TableName.valueOf(name.getMethodName()); UTIL.getAdmin().createTable( TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(ColumnFamilyDescriptorBuilder - .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_SERIAL).build()).build()); + .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build()); UTIL.waitTableAvailable(tableName); try (Table table = UTIL.getConnection().getTable(tableName)) { for (int i = 0; i < 100; i++) { @@ -273,7 +274,7 @@ public class TestSerialReplication { TableName tableName = TableName.valueOf(name.getMethodName()); UTIL.getAdmin().createTable( TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(ColumnFamilyDescriptorBuilder - .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_SERIAL).build()).build()); + .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build()); UTIL.waitTableAvailable(tableName); try (Table table = UTIL.getConnection().getTable(tableName)) { for (int i = 0; i < 100; i++) { @@ -330,7 +331,7 @@ public class TestSerialReplication { UTIL.getAdmin().createTable( TableDescriptorBuilder.newBuilder(tableName) .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF) - .setScope(HConstants.REPLICATION_SCOPE_SERIAL).build()) + .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) .build(), new byte[][] { splitKey }); UTIL.waitTableAvailable(tableName);