Repository: hbase
Updated Branches:
  refs/heads/master 15da74cce -> dd6f4525e


HBASE-20148 Make serial replication as a option for a peer instead of a table


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/dd6f4525
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/dd6f4525
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/dd6f4525

Branch: refs/heads/master
Commit: dd6f4525e7a9eaceb6ed4f97059b2dd3c532d323
Parents: 15da74c
Author: zhangduo <zhang...@apache.org>
Authored: Fri Mar 9 15:00:59 2018 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Sat Mar 10 09:04:44 2018 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/HTableDescriptor.java   |  8 -----
 .../hadoop/hbase/client/TableDescriptor.java    | 20 +++++++-----
 .../hbase/client/TableDescriptorBuilder.java    |  9 ------
 .../replication/ReplicationPeerConfigUtil.java  |  5 +++
 .../replication/ReplicationPeerConfig.java      | 32 +++++++++++++++-----
 .../ReplicationPeerConfigBuilder.java           | 12 ++++++++
 .../org/apache/hadoop/hbase/HConstants.java     |  6 ----
 .../src/main/protobuf/Replication.proto         |  1 +
 .../hbase/replication/ReplicationUtils.java     |  3 ++
 .../master/assignment/RegionStateStore.java     | 14 ++++-----
 .../hbase/replication/ScopeWALEntryFilter.java  | 32 ++++++++++----------
 .../regionserver/ReplicationSource.java         |  4 +++
 .../ReplicationSourceWALActionListener.java     | 10 +-----
 .../ReplicationSourceWALReader.java             |  6 ++--
 .../regionserver/SerialReplicationChecker.java  |  2 +-
 .../org/apache/hadoop/hbase/wal/WALKeyImpl.java |  8 -----
 .../TestReplicationWALEntryFilters.java         | 15 ++++++---
 .../replication/TestSerialReplication.java      |  9 +++---
 18 files changed, 104 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
index 3652d10..db8870d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
@@ -537,14 +537,6 @@ public class HTableDescriptor implements TableDescriptor, 
Comparable<HTableDescr
   }
 
   /**
-   * Return true if there are at least one cf whose replication scope is 
serial.
-   */
-  @Override
-  public boolean hasSerialReplicationScope() {
-    return delegatee.hasSerialReplicationScope();
-  }
-
-  /**
    * Returns the configured replicas per region
    */
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
index 3505175..1ec61a2 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
@@ -24,7 +24,7 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
-
+import java.util.stream.Stream;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -232,11 +232,6 @@ public interface TableDescriptor {
   boolean hasRegionMemStoreReplication();
 
   /**
-   * @return true if there are at least one cf whose replication scope is 
serial.
-   */
-  boolean hasSerialReplicationScope();
-
-  /**
    * Check if the compaction enable flag of the table is true. If flag is false
    * then no minor/major compactions will be done in real.
    *
@@ -275,6 +270,16 @@ public interface TableDescriptor {
   boolean isReadOnly();
 
   /**
+   * Check if any of the table's cfs' replication scope are set to
+   * {@link HConstants#REPLICATION_SCOPE_GLOBAL}.
+   * @return {@code true} if we have, otherwise {@code false}.
+   */
+  default boolean hasGlobalReplicationScope() {
+    return Stream.of(getColumnFamilies())
+      .anyMatch(cf -> cf.getScope() == HConstants.REPLICATION_SCOPE_GLOBAL);
+  }
+
+  /**
    * Check if the table's cfs' replication scope matched with the replication 
state
    * @param enabled replication state
    * @return true if matched, otherwise false
@@ -284,8 +289,7 @@ public interface TableDescriptor {
     boolean hasDisabled = false;
 
     for (ColumnFamilyDescriptor cf : getColumnFamilies()) {
-      if (cf.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL &&
-        cf.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) {
+      if (cf.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) {
         hasDisabled = true;
       } else {
         hasEnabled = true;

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
index 0855f87..c1db64b 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
@@ -1054,15 +1054,6 @@ public class TableDescriptorBuilder {
     }
 
     /**
-     * Return true if there are at least one cf whose replication scope is 
serial.
-     */
-    @Override
-    public boolean hasSerialReplicationScope() {
-      return families.values().stream()
-        .anyMatch(column -> column.getScope() == 
HConstants.REPLICATION_SCOPE_SERIAL);
-    }
-
-    /**
      * Returns the configured replicas per region
      */
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
index a234a9b..b1c1713 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
@@ -303,6 +303,10 @@ public final class ReplicationPeerConfigUtil {
       builder.setReplicateAllUserTables(peer.getReplicateAll());
     }
 
+    if (peer.hasSerial()) {
+      builder.setSerial(peer.getSerial());
+    }
+
     Map<TableName, List<String>> excludeTableCFsMap = 
convert2Map(peer.getExcludeTableCfsList()
         .toArray(new 
ReplicationProtos.TableCF[peer.getExcludeTableCfsCount()]));
     if (excludeTableCFsMap != null) {
@@ -357,6 +361,7 @@ public final class ReplicationPeerConfigUtil {
 
     builder.setBandwidth(peerConfig.getBandwidth());
     builder.setReplicateAll(peerConfig.replicateAllUserTables());
+    builder.setSerial(peerConfig.isSerial());
 
     ReplicationProtos.TableCF[] excludeTableCFs = 
convert(peerConfig.getExcludeTableCFsMap());
     if (excludeTableCFs != null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
index bf8d030..e0d9a4c 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -46,6 +46,7 @@ public class ReplicationPeerConfig {
   private Map<TableName, ? extends Collection<String>> excludeTableCFsMap = 
null;
   private Set<String> excludeNamespaces = null;
   private long bandwidth = 0;
+  private final boolean serial;
 
   private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder) {
     this.clusterKey = builder.clusterKey;
@@ -64,6 +65,7 @@ public class ReplicationPeerConfig {
         builder.excludeNamespaces != null ? 
Collections.unmodifiableSet(builder.excludeNamespaces)
             : null;
     this.bandwidth = builder.bandwidth;
+    this.serial = builder.serial;
   }
 
   private Map<TableName, List<String>>
@@ -82,6 +84,7 @@ public class ReplicationPeerConfig {
   public ReplicationPeerConfig() {
     this.peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR);
     this.configuration = new HashMap<>(0);
+    this.serial = false;
   }
 
   /**
@@ -214,16 +217,20 @@ public class ReplicationPeerConfig {
     return new ReplicationPeerConfigBuilderImpl();
   }
 
+  public boolean isSerial() {
+    return serial;
+  }
+
   public static ReplicationPeerConfigBuilder newBuilder(ReplicationPeerConfig 
peerConfig) {
     ReplicationPeerConfigBuilderImpl builder = new 
ReplicationPeerConfigBuilderImpl();
     builder.setClusterKey(peerConfig.getClusterKey())
-        .setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl())
-        
.putAllPeerData(peerConfig.getPeerData()).putAllConfiguration(peerConfig.getConfiguration())
-        
.setTableCFsMap(peerConfig.getTableCFsMap()).setNamespaces(peerConfig.getNamespaces())
-        .setReplicateAllUserTables(peerConfig.replicateAllUserTables())
-        .setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap())
-        .setExcludeNamespaces(peerConfig.getExcludeNamespaces())
-        .setBandwidth(peerConfig.getBandwidth());
+      .setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl())
+      
.putAllPeerData(peerConfig.getPeerData()).putAllConfiguration(peerConfig.getConfiguration())
+      
.setTableCFsMap(peerConfig.getTableCFsMap()).setNamespaces(peerConfig.getNamespaces())
+      .setReplicateAllUserTables(peerConfig.replicateAllUserTables())
+      .setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap())
+      .setExcludeNamespaces(peerConfig.getExcludeNamespaces())
+      
.setBandwidth(peerConfig.getBandwidth()).setSerial(peerConfig.isSerial());
     return builder;
   }
 
@@ -250,6 +257,8 @@ public class ReplicationPeerConfig {
 
     private long bandwidth = 0;
 
+    private boolean serial = false;
+
     @Override
     public ReplicationPeerConfigBuilder setClusterKey(String clusterKey) {
       this.clusterKey = clusterKey;
@@ -313,6 +322,12 @@ public class ReplicationPeerConfig {
     }
 
     @Override
+    public ReplicationPeerConfigBuilder setSerial(boolean serial) {
+      this.serial = serial;
+      return this;
+    }
+
+    @Override
     public ReplicationPeerConfig build() {
       // It would be nice to validate the configuration, but we have to work 
with "old" data
       // from ZK which makes it much more difficult.
@@ -340,7 +355,8 @@ public class ReplicationPeerConfig {
         builder.append("tableCFs=").append(tableCFsMap.toString()).append(",");
       }
     }
-    builder.append("bandwidth=").append(bandwidth);
+    builder.append("bandwidth=").append(bandwidth).append(",");
+    builder.append("serial=").append(serial);
     return builder.toString();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java
index 0b2f2e2..4c531c5 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java
@@ -138,6 +138,18 @@ public interface ReplicationPeerConfigBuilder {
   ReplicationPeerConfigBuilder setExcludeNamespaces(Set<String> namespaces);
 
   /**
+   * <p>
+   * Sets whether we should preserve order when replicating, i.e, serial 
replication.
+   * </p>
+   * <p>
+   * Default {@code false}.
+   * </p>
+   * @param serial {@code true} means preserve order, otherwise {@code false}.
+   * @return {@code this}
+   */
+  ReplicationPeerConfigBuilder setSerial(boolean serial);
+
+  /**
    * Builds the configuration object from the current state of {@code this}.
    * @return A {@link ReplicationPeerConfig} instance.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 3dd0ac8..0039a56 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -647,12 +647,6 @@ public final class HConstants {
   public static final int REPLICATION_SCOPE_GLOBAL = 1;
 
   /**
-   * Scope tag for serially scoped data
-   * This data will be replicated to all peers by the order of sequence id.
-   */
-  public static final int REPLICATION_SCOPE_SERIAL = 2;
-
-  /**
    * Default cluster ID, cannot be used to identify a cluster so a key with
    * this value means it wasn't meant for replication.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-protocol-shaded/src/main/protobuf/Replication.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/Replication.proto 
b/hbase-protocol-shaded/src/main/protobuf/Replication.proto
index 9f7b4c2..557b87c 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Replication.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Replication.proto
@@ -48,6 +48,7 @@ message ReplicationPeer {
   optional bool replicate_all = 8;
   repeated TableCF exclude_table_cfs = 9;
   repeated bytes exclude_namespaces = 10;
+  optional bool serial = 11;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index 11507aa..2a6870a 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -115,6 +115,9 @@ public final class ReplicationUtils {
     if (rpc1.replicateAllUserTables() != rpc2.replicateAllUserTables()) {
       return false;
     }
+    if (rpc1.isSerial() != rpc2.isSerial()) {
+      return false;
+    }
     if (rpc1.replicateAllUserTables()) {
       return isNamespacesEqual(rpc1.getExcludeNamespaces(), 
rpc2.getExcludeNamespaces()) &&
         isTableCFsEqual(rpc1.getExcludeTableCFsMap(), 
rpc2.getExcludeTableCFsMap());

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
index 1ffc31f..c8017202 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
@@ -165,7 +165,7 @@ public class RegionStateStore {
       MetaTableAccessor.addLocation(put, regionLocation, openSeqNum, 
replicaId);
       // only update replication barrier for default replica
       if (regionInfo.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID &&
-        hasSerialReplicationScope(regionInfo.getTable())) {
+        hasGlobalReplicationScope(regionInfo.getTable())) {
         MetaTableAccessor.addReplicationBarrier(put, openSeqNum);
       }
       info.append(", openSeqNum=").append(openSeqNum);
@@ -224,7 +224,7 @@ public class RegionStateStore {
       ServerName serverName) throws IOException {
     TableDescriptor htd = getTableDescriptor(parent.getTable());
     long parentOpenSeqNum = HConstants.NO_SEQNUM;
-    if (htd.hasSerialReplicationScope()) {
+    if (htd.hasGlobalReplicationScope()) {
       parentOpenSeqNum = getOpenSeqNumForParentRegion(parent);
     }
     MetaTableAccessor.splitRegion(master.getConnection(), parent, 
parentOpenSeqNum, hriA, hriB,
@@ -239,7 +239,7 @@ public class RegionStateStore {
     TableDescriptor htd = getTableDescriptor(child.getTable());
     long regionAOpenSeqNum = -1L;
     long regionBOpenSeqNum = -1L;
-    if (htd.hasSerialReplicationScope()) {
+    if (htd.hasGlobalReplicationScope()) {
       regionAOpenSeqNum = getOpenSeqNumForParentRegion(hriA);
       regionBOpenSeqNum = getOpenSeqNumForParentRegion(hriB);
     }
@@ -261,12 +261,12 @@ public class RegionStateStore {
   // ==========================================================================
   //  Table Descriptors helpers
   // ==========================================================================
-  private boolean hasSerialReplicationScope(TableName tableName) throws 
IOException {
-    return hasSerialReplicationScope(getTableDescriptor(tableName));
+  private boolean hasGlobalReplicationScope(TableName tableName) throws 
IOException {
+    return hasGlobalReplicationScope(getTableDescriptor(tableName));
   }
 
-  private boolean hasSerialReplicationScope(TableDescriptor htd) {
-    return htd != null ? htd.hasSerialReplicationScope() : false;
+  private boolean hasGlobalReplicationScope(TableDescriptor htd) {
+    return htd != null ? htd.hasGlobalReplicationScope() : false;
   }
 
   private int getRegionReplication(TableDescriptor htd) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
index 6a2fbcf..f8722eb 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
@@ -37,31 +37,31 @@ public class ScopeWALEntryFilter implements WALEntryFilter, 
WALCellFilter {
 
   @Override
   public Entry filter(Entry entry) {
-    NavigableMap<byte[], Integer> scopes = 
entry.getKey().getReplicationScopes();
-    if (scopes == null || scopes.isEmpty()) {
-      return null;
-    }
+    // Do not filter out an entire entry by replication scopes. As now we 
support serial
+    // replication, the sequence id of a marker is also needed by upper layer. 
We will filter out
+    // all the cells in the filterCell method below if the replication scopes 
is null or empty.
     return entry;
   }
 
+  private boolean hasGlobalScope(NavigableMap<byte[], Integer> scopes, byte[] 
family) {
+    Integer scope = scopes.get(family);
+    return scope != null && scope.intValue() == 
HConstants.REPLICATION_SCOPE_GLOBAL;
+  }
   @Override
   public Cell filterCell(Entry entry, Cell cell) {
-    final NavigableMap<byte[], Integer> scopes = 
entry.getKey().getReplicationScopes();
-    // The scope will be null or empty if
-    // there's nothing to replicate in that WALEdit
-    byte[] fam = CellUtil.cloneFamily(cell);
+    NavigableMap<byte[], Integer> scopes = 
entry.getKey().getReplicationScopes();
+    if (scopes == null || scopes.isEmpty()) {
+      return null;
+    }
+    byte[] family = CellUtil.cloneFamily(cell);
     if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
-      cell = bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() {
+      return bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() {
         @Override
-        public boolean apply(byte[] fam) {
-          return !scopes.containsKey(fam) || scopes.get(fam) == 
HConstants.REPLICATION_SCOPE_LOCAL;
+        public boolean apply(byte[] family) {
+          return !hasGlobalScope(scopes, family);
         }
       });
-    } else {
-      if (!scopes.containsKey(fam) || scopes.get(fam) == 
HConstants.REPLICATION_SCOPE_LOCAL) {
-        return null;
-      }
     }
-    return cell;
+    return hasGlobalScope(scopes, family) ? cell : null;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 86e7f98..2f9cd56 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -382,6 +382,10 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     return replicationPeer.isPeerEnabled();
   }
 
+  public boolean isSerial() {
+    return replicationPeer.getPeerConfig().isSerial();
+  }
+
   private void initialize() {
     int sleepMultiplier = 1;
     while (this.isSourceActive()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
index 95fc6a0..27b25c4 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
@@ -72,18 +72,10 @@ class ReplicationSourceWALActionListener implements 
WALActionsListener {
     if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) {
       return;
     }
-    WALKeyImpl keyImpl = (WALKeyImpl) logKey;
-    // For serial replication we need to count all the sequence ids even for 
markers, so here we
-    // always need to retain the replication scopes to let the replication wal 
reader to know that
-    // we need serial replication. The ScopeWALEntryFilter will help filtering 
out the cell for
-    // WALEdit.METAFAMILY.
-    if (keyImpl.hasSerialReplicationScope()) {
-      return;
-    }
     // For replay, or if all the cells are markers, do not need to store 
replication scope.
     if (logEdit.isReplay() ||
       logEdit.getCells().stream().allMatch(c -> CellUtil.matchingFamily(c, 
WALEdit.METAFAMILY))) {
-      keyImpl.clearReplicationScope();
+      ((WALKeyImpl) logKey).clearReplicationScope();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index ad3baaf..da92a09 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -186,9 +186,9 @@ public class ReplicationSourceWALReader extends Thread {
       new WALEntryBatch(replicationBatchCountCapacity, 
entryStream.getCurrentPath());
     do {
       Entry entry = entryStream.peek();
-      boolean hasSerialReplicationScope = 
entry.getKey().hasSerialReplicationScope();
+      boolean isSerial = source.isSerial();
       boolean doFiltering = true;
-      if (hasSerialReplicationScope) {
+      if (isSerial) {
         if (firstCellInEntryBeforeFiltering == null) {
           assert !entry.getEdit().isEmpty() : "should not write empty edits";
           // Used to locate the region record in meta table. In WAL we only 
have the table name and
@@ -208,7 +208,7 @@ public class ReplicationSourceWALReader extends Thread {
         entry = filterEntry(entry);
       }
       if (entry != null) {
-        if (hasSerialReplicationScope) {
+        if (isSerial) {
           if (!serialReplicationChecker.canPush(entry, 
firstCellInEntryBeforeFiltering)) {
             if (batch.getLastWalPosition() > positionBefore) {
               // we have something that can push, break

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java
index 9276359..b775d25 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java
@@ -266,7 +266,7 @@ class SerialReplicationChecker {
       throws IOException, InterruptedException {
     byte[] row = CellUtil.cloneRow(firstCellInEdit);
     while (!canPush(entry, row)) {
-      LOG.debug("Can not push{}, wait", entry);
+      LOG.debug("Can not push {}, wait", entry);
       Thread.sleep(waitTimeMs);
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
index ac23d1d..8828239 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
@@ -419,14 +419,6 @@ public class WALKeyImpl implements WALKey {
     setReplicationScope(null);
   }
 
-  public boolean hasSerialReplicationScope() {
-    if (replicationScope == null || replicationScope.isEmpty()) {
-      return false;
-    }
-    return replicationScope.values().stream()
-      .anyMatch(scope -> scope.intValue() == 
HConstants.REPLICATION_SCOPE_SERIAL);
-  }
-
   /**
    * Marks that the cluster with the given clusterId has consumed the change
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
index 67a2551..f2c5e50 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.replication;
 
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -32,9 +33,9 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparatorImpl;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -48,7 +49,7 @@ import org.junit.experimental.categories.Category;
 
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 
-@Category({ReplicationTests.class, SmallTests.class})
+@Category({ ReplicationTests.class, SmallTests.class })
 public class TestReplicationWALEntryFilters {
 
   @ClassRule
@@ -65,7 +66,8 @@ public class TestReplicationWALEntryFilters {
     SystemTableWALEntryFilter filter = new SystemTableWALEntryFilter();
 
     // meta
-    WALKeyImpl key1 = new 
WALKeyImpl(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
+    WALKeyImpl key1 =
+      new 
WALKeyImpl(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
         TableName.META_TABLE_NAME, System.currentTimeMillis());
     Entry metaEntry = new Entry(key1, null);
 
@@ -96,12 +98,15 @@ public class TestReplicationWALEntryFilters {
     Entry userEntryEmpty = createEntry(null);
 
     // no scopes
-    assertEquals(null, filter.filter(userEntry));
+    // now we will not filter out entries without a replication scope since 
serial replication still
+    // need the sequence id, but the cells will all be filtered out.
+    assertTrue(filter.filter(userEntry).getEdit().isEmpty());
 
     // empty scopes
+    // ditto
     TreeMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
     userEntry = createEntry(scopes, a, b);
-    assertEquals(null, filter.filter(userEntry));
+    assertTrue(filter.filter(userEntry).getEdit().isEmpty());
 
     // different scope
     scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
index bf6c0c8..f8efcf0 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
@@ -156,7 +156,8 @@ public class TestSerialReplication {
     // add in disable state, so later when enabling it all sources will start 
push together.
     UTIL.getAdmin().addReplicationPeer(PEER_ID,
       ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
-        
.setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build(),
+        
.setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).setSerial(true)
+        .build(),
       false);
   }
 
@@ -234,7 +235,7 @@ public class TestSerialReplication {
     TableName tableName = TableName.valueOf(name.getMethodName());
     UTIL.getAdmin().createTable(
       
TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(ColumnFamilyDescriptorBuilder
-        
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_SERIAL).build()).build());
+        
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
     UTIL.waitTableAvailable(tableName);
     try (Table table = UTIL.getConnection().getTable(tableName)) {
       for (int i = 0; i < 100; i++) {
@@ -273,7 +274,7 @@ public class TestSerialReplication {
     TableName tableName = TableName.valueOf(name.getMethodName());
     UTIL.getAdmin().createTable(
       
TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(ColumnFamilyDescriptorBuilder
-        
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_SERIAL).build()).build());
+        
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
     UTIL.waitTableAvailable(tableName);
     try (Table table = UTIL.getConnection().getTable(tableName)) {
       for (int i = 0; i < 100; i++) {
@@ -330,7 +331,7 @@ public class TestSerialReplication {
     UTIL.getAdmin().createTable(
       TableDescriptorBuilder.newBuilder(tableName)
         .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF)
-          .setScope(HConstants.REPLICATION_SCOPE_SERIAL).build())
+          .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
         .build(),
       new byte[][] { splitKey });
     UTIL.waitTableAvailable(tableName);

Reply via email to