HBASE-20117 Cleanup the unused replication barriers in meta table

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/644bfe36
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/644bfe36
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/644bfe36

Branch: refs/heads/branch-2
Commit: 644bfe36b297b2787bf07a46eb6f5085322edfa9
Parents: fedf3ca
Author: zhangduo <zhang...@apache.org>
Authored: Tue Mar 13 21:36:06 2018 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Mon Apr 9 15:18:44 2018 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/MetaTableAccessor.java  |   2 +-
 .../hbase/replication/ReplicationUtils.java     |  56 +++-
 .../org/apache/hadoop/hbase/master/HMaster.java |  91 +++---
 .../cleaner/ReplicationBarrierCleaner.java      | 162 ++++++++++
 .../replication/ReplicationPeerManager.java     |  10 +
 .../NamespaceTableCfWALEntryFilter.java         |  39 +--
 .../cleaner/TestReplicationBarrierCleaner.java  | 293 +++++++++++++++++++
 .../TestSerialReplicationChecker.java           |   2 +-
 8 files changed, 565 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/644bfe36/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index 2a88b56..a800c1c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -2053,7 +2053,7 @@ public class MetaTableAccessor {
     return Bytes.toLong(c.getValueArray(), c.getValueOffset(), 
c.getValueLength());
   }
 
-  private static long[] getReplicationBarriers(Result result) {
+  public static long[] getReplicationBarriers(Result result) {
     return result.getColumnCells(HConstants.REPLICATION_BARRIER_FAMILY, 
HConstants.SEQNUM_QUALIFIER)
       
.stream().mapToLong(MetaTableAccessor::getReplicationBarrier).sorted().distinct().toArray();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/644bfe36/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index 857b385..e2479e0 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -39,15 +39,6 @@ public final class ReplicationUtils {
   private ReplicationUtils() {
   }
 
-  /**
-   * @param c Configuration to look at
-   * @return True if replication for bulk load data is enabled.
-   */
-  public static boolean isReplicationForBulkLoadDataEnabled(final 
Configuration c) {
-    return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
-      HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
-  }
-
   public static Configuration 
getPeerClusterConfiguration(ReplicationPeerConfig peerConfig,
       Configuration baseConf) throws ReplicationException {
     Configuration otherConf;
@@ -135,4 +126,51 @@ public final class ReplicationUtils {
         isTableCFsEqual(rpc1.getTableCFsMap(), rpc2.getTableCFsMap());
     }
   }
+
+  /**
+   * @param c Configuration to look at
+   * @return True if replication for bulk load data is enabled.
+   */
+  public static boolean isReplicationForBulkLoadDataEnabled(final 
Configuration c) {
+    return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
+      HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
+  }
+
+  /**
+   * Returns whether we should replicate the given table.
+   */
+  public static boolean contains(ReplicationPeerConfig peerConfig, TableName 
tableName) {
+    String namespace = tableName.getNamespaceAsString();
+    if (peerConfig.replicateAllUserTables()) {
+      // replicate all user tables, but filter by exclude namespaces and 
table-cfs config
+      Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces();
+      if (excludeNamespaces != null && excludeNamespaces.contains(namespace)) {
+        return false;
+      }
+      Map<TableName, List<String>> excludedTableCFs = 
peerConfig.getTableCFsMap();
+      // trap here, must check existence first since HashMap allows null value.
+      if (excludedTableCFs == null || 
!excludedTableCFs.containsKey(tableName)) {
+        return true;
+      }
+      List<String> cfs = excludedTableCFs.get(tableName);
+      // if cfs is null or empty then we can make sure that we do not need to 
replicate this table,
+      // otherwise, we may still need to replicate the table but filter out 
some families.
+      return cfs != null && !cfs.isEmpty();
+    } else {
+      // Not replicate all user tables, so filter by namespaces and table-cfs 
config
+      Set<String> namespaces = peerConfig.getNamespaces();
+      Map<TableName, List<String>> tableCFs = peerConfig.getTableCFsMap();
+
+      if (namespaces == null && tableCFs == null) {
+        return false;
+      }
+
+      // First filter by namespaces config
+      // If table's namespace in peer config, all the tables data are 
applicable for replication
+      if (namespaces != null && namespaces.contains(namespace)) {
+        return true;
+      }
+      return tableCFs != null && tableCFs.containsKey(tableName);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/644bfe36/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 7d751fb..6d0b58b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.PleaseHoldException;
 import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
+import org.apache.hadoop.hbase.ScheduledChore;
 import org.apache.hadoop.hbase.ServerMetricsBuilder;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableDescriptors;
@@ -109,6 +110,7 @@ import 
org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
 import org.apache.hadoop.hbase.master.cleaner.CleanerChore;
 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
 import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
+import org.apache.hadoop.hbase.master.cleaner.ReplicationBarrierCleaner;
 import org.apache.hadoop.hbase.master.locking.LockManager;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
@@ -371,6 +373,7 @@ public class HMaster extends HRegionServer implements 
MasterServices {
   CatalogJanitor catalogJanitorChore;
   private LogCleaner logCleaner;
   private HFileCleaner hfileCleaner;
+  private ReplicationBarrierCleaner replicationBarrierCleaner;
   private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
   private MobCompactionChore mobCompactChore;
   private MasterMobCompactionThread mobCompactThread;
@@ -1179,19 +1182,30 @@ public class HMaster extends HRegionServer implements 
MasterServices {
          getMasterWalManager().getOldLogDir());
     getChoreService().scheduleChore(logCleaner);
 
-   //start the hfile archive cleaner thread
+    // start the hfile archive cleaner thread
     Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
     Map<String, Object> params = new HashMap<>();
     params.put(MASTER, this);
     this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, 
getMasterFileSystem()
         .getFileSystem(), archiveDir, params);
     getChoreService().scheduleChore(hfileCleaner);
+
+    replicationBarrierCleaner =
+      new ReplicationBarrierCleaner(conf, this, getConnection(), 
replicationPeerManager);
+    getChoreService().scheduleChore(replicationBarrierCleaner);
+
     serviceStarted = true;
     if (LOG.isTraceEnabled()) {
       LOG.trace("Started service threads");
     }
   }
 
+  private void cancelChore(ScheduledChore chore) {
+    if (chore != null) {
+      chore.cancel();
+    }
+  }
+
   @Override
   protected void stopServiceThreads() {
     if (masterJettyServer != null) {
@@ -1205,24 +1219,33 @@ public class HMaster extends HRegionServer implements 
MasterServices {
     super.stopServiceThreads();
     stopChores();
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Stopping service threads");
-    }
+    LOG.debug("Stopping service threads");
 
-    // Clean up and close up shop
-    if (this.logCleaner != null) this.logCleaner.cancel(true);
-    if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
-    if (this.quotaManager != null) this.quotaManager.stop();
+    if (this.quotaManager != null) {
+      this.quotaManager.stop();
+    }
 
-    if (this.activeMasterManager != null) this.activeMasterManager.stop();
-    if (this.serverManager != null) this.serverManager.stop();
-    if (this.assignmentManager != null) this.assignmentManager.stop();
+    if (this.activeMasterManager != null) {
+      this.activeMasterManager.stop();
+    }
+    if (this.serverManager != null) {
+      this.serverManager.stop();
+    }
+    if (this.assignmentManager != null) {
+      this.assignmentManager.stop();
+    }
 
     stopProcedureExecutor();
 
-    if (this.walManager != null) this.walManager.stop();
-    if (this.fileSystemManager != null) this.fileSystemManager.stop();
-    if (this.mpmHost != null) this.mpmHost.stop("server shutting down.");
+    if (this.walManager != null) {
+      this.walManager.stop();
+    }
+    if (this.fileSystemManager != null) {
+      this.fileSystemManager.stop();
+    }
+    if (this.mpmHost != null) {
+      this.mpmHost.stop("server shutting down.");
+    }
   }
 
   private void startProcedureExecutor() throws IOException {
@@ -1261,37 +1284,21 @@ public class HMaster extends HRegionServer implements 
MasterServices {
   }
 
   private void stopChores() {
-    if (this.expiredMobFileCleanerChore != null) {
-      this.expiredMobFileCleanerChore.cancel(true);
-    }
-    if (this.mobCompactChore != null) {
-      this.mobCompactChore.cancel(true);
-    }
-    if (this.balancerChore != null) {
-      this.balancerChore.cancel(true);
-    }
-    if (this.normalizerChore != null) {
-      this.normalizerChore.cancel(true);
-    }
-    if (this.clusterStatusChore != null) {
-      this.clusterStatusChore.cancel(true);
-    }
-    if (this.catalogJanitorChore != null) {
-      this.catalogJanitorChore.cancel(true);
-    }
-    if (this.clusterStatusPublisherChore != null){
-      clusterStatusPublisherChore.cancel(true);
-    }
+    cancelChore(this.expiredMobFileCleanerChore);
+    cancelChore(this.mobCompactChore);
+    cancelChore(this.balancerChore);
+    cancelChore(this.normalizerChore);
+    cancelChore(this.clusterStatusChore);
+    cancelChore(this.catalogJanitorChore);
+    cancelChore(this.clusterStatusPublisherChore);
     if (this.mobCompactThread != null) {
       this.mobCompactThread.close();
     }
-
-    if (this.quotaObserverChore != null) {
-      quotaObserverChore.cancel();
-    }
-    if (this.snapshotQuotaChore != null) {
-      snapshotQuotaChore.cancel();
-    }
+    cancelChore(this.clusterStatusPublisherChore);
+    cancelChore(this.snapshotQuotaChore);
+    cancelChore(this.logCleaner);
+    cancelChore(this.hfileCleaner);
+    cancelChore(this.replicationBarrierCleaner);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/644bfe36/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationBarrierCleaner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationBarrierCleaner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationBarrierCleaner.java
new file mode 100644
index 0000000..16b8fc5
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationBarrierCleaner.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.cleaner;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used to clean the useless barriers in {@link 
HConstants#REPLICATION_BARRIER_FAMILY_STR} family in
+ * meta table.
+ */
+@InterfaceAudience.Private
+public class ReplicationBarrierCleaner extends ScheduledChore {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationBarrierCleaner.class);
+
+  private static final String REPLICATION_BARRIER_CLEANER_INTERVAL =
+    "hbase.master.cleaner.replication.barrier.interval";
+
+  // 12 hour. Usually regions will not be moved so the barrier are rarely 
updated. Use a large
+  // interval.
+  private static final int DEFAULT_REPLICATION_BARRIER_CLEANER_INTERVAL = 12 * 
60 * 60 * 1000;
+
+  private final Connection conn;
+
+  private final ReplicationPeerManager peerManager;
+
+  public ReplicationBarrierCleaner(Configuration conf, Stoppable stopper, 
Connection conn,
+      ReplicationPeerManager peerManager) {
+    super("ReplicationBarrierCleaner", stopper, 
conf.getInt(REPLICATION_BARRIER_CLEANER_INTERVAL,
+      DEFAULT_REPLICATION_BARRIER_CLEANER_INTERVAL));
+    this.conn = conn;
+    this.peerManager = peerManager;
+  }
+
+  @Override
+  protected void chore() {
+    long totalRows = 0;
+    long cleanedRows = 0;
+    long deletedRows = 0;
+    long deletedBarriers = 0;
+    TableName tableName = null;
+    List<String> peerIds = null;
+    try (Table metaTable = conn.getTable(TableName.META_TABLE_NAME);
+        ResultScanner scanner = metaTable.getScanner(
+          new 
Scan().addFamily(HConstants.REPLICATION_BARRIER_FAMILY).readAllVersions())) {
+      for (;;) {
+        Result result = scanner.next();
+        if (result == null) {
+          break;
+        }
+        totalRows++;
+        long[] barriers = MetaTableAccessor.getReplicationBarriers(result);
+        if (barriers.length == 0) {
+          continue;
+        }
+        byte[] regionName = result.getRow();
+        TableName tn = RegionInfo.getTable(regionName);
+        if (!tn.equals(tableName)) {
+          tableName = tn;
+          peerIds = peerManager.getSerialPeerIdsBelongsTo(tableName);
+        }
+        if (peerIds.isEmpty()) {
+          // no serial replication, only keep the newest barrier
+          Cell cell = 
result.getColumnLatestCell(HConstants.REPLICATION_BARRIER_FAMILY,
+            HConstants.SEQNUM_QUALIFIER);
+          metaTable.delete(new 
Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY,
+            cell.getTimestamp() - 1));
+          cleanedRows++;
+          deletedBarriers += barriers.length - 1;
+          continue;
+        }
+        String encodedRegionName = RegionInfo.encodeRegionName(regionName);
+        long pushedSeqId = Long.MAX_VALUE;
+        for (String peerId : peerIds) {
+          pushedSeqId = Math.min(pushedSeqId,
+            peerManager.getQueueStorage().getLastSequenceId(encodedRegionName, 
peerId));
+        }
+        int index = Arrays.binarySearch(barriers, pushedSeqId);
+        if (index == -1) {
+          // beyond the first barrier, usually this should not happen but 
anyway let's add a check
+          // for it.
+          continue;
+        }
+        if (index < 0) {
+          index = -index - 1;
+        } else {
+          index++;
+        }
+        // A special case for merged/split region, where we are in the last 
closed range and the
+        // pushedSeqId is the last barrier minus 1.
+        if (index == barriers.length - 1 && pushedSeqId == 
barriers[barriers.length - 1] - 1) {
+          // check if the region has already been removed, i.e, no catalog 
family
+          if (!metaTable.exists(new 
Get(regionName).addFamily(HConstants.CATALOG_FAMILY))) {
+            metaTable
+              .delete(new 
Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY));
+            deletedRows++;
+            deletedBarriers += barriers.length;
+            continue;
+          }
+        }
+        // the barrier before 'index - 1'(exclusive) can be safely removed. 
See the algorithm in
+        // SerialReplicationChecker for more details.
+        if (index - 1 > 0) {
+          List<Cell> cells = 
result.getColumnCells(HConstants.REPLICATION_BARRIER_FAMILY,
+            HConstants.SEQNUM_QUALIFIER);
+          // All barriers before this cell(exclusive) can be removed
+          Cell cell = cells.get(cells.size() - index);
+          metaTable.delete(new 
Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY,
+            cell.getTimestamp() - 1));
+          cleanedRows++;
+          deletedBarriers += index - 1;
+        }
+      }
+    } catch (ReplicationException | IOException e) {
+      LOG.warn("Failed to clean up replication barrier", e);
+    }
+    if (totalRows > 0) {
+      LOG.info(
+        "Cleanup replication barriers: " +
+          "totalRows {}, cleanedRows {}, deletedRows {}, deletedBarriers {}",
+        totalRows, cleanedRows, deletedRows, deletedBarriers);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/644bfe36/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index 19cd89d..1e93373 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -327,6 +327,16 @@ public class ReplicationPeerManager {
     }
   }
 
+  public List<String> getSerialPeerIdsBelongsTo(TableName tableName) {
+    return peers.values().stream().filter(p -> p.getPeerConfig().isSerial())
+      .filter(p -> ReplicationUtils.contains(p.getPeerConfig(), 
tableName)).map(p -> p.getPeerId())
+      .collect(Collectors.toList());
+  }
+
+  public ReplicationQueueStorage getQueueStorage() {
+    return queueStorage;
+  }
+
   public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf)
       throws ReplicationException {
     ReplicationPeerStorage peerStorage =

http://git-wip-us.apache.org/repos/asf/hbase/blob/644bfe36/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
index 08c9f37..3a3200a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.TableName;
@@ -53,44 +52,10 @@ public class NamespaceTableCfWALEntryFilter implements 
WALEntryFilter, WALCellFi
 
   @Override
   public Entry filter(Entry entry) {
-    TableName tabName = entry.getKey().getTableName();
-    String namespace = tabName.getNamespaceAsString();
-    ReplicationPeerConfig peerConfig = this.peer.getPeerConfig();
-
-    if (peerConfig.replicateAllUserTables()) {
-      // replicate all user tables, but filter by exclude namespaces config
-      Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces();
-
-      // return null(prevent replicating) if logKey's table is in this peer's
-      // exclude namespaces list
-      if (excludeNamespaces != null && excludeNamespaces.contains(namespace)) {
-        return null;
-      }
-
+    if (ReplicationUtils.contains(this.peer.getPeerConfig(), 
entry.getKey().getTableName())) {
       return entry;
     } else {
-      // Not replicate all user tables, so filter by namespaces and table-cfs 
config
-      Set<String> namespaces = peerConfig.getNamespaces();
-      Map<TableName, List<String>> tableCFs = peerConfig.getTableCFsMap();
-
-      if (namespaces == null && tableCFs == null) {
-        return null;
-      }
-
-      // First filter by namespaces config
-      // If table's namespace in peer config, all the tables data are 
applicable for replication
-      if (namespaces != null && namespaces.contains(namespace)) {
-        return entry;
-      }
-
-      // Then filter by table-cfs config
-      // return null(prevent replicating) if logKey's table isn't in this 
peer's
-      // replicable tables list
-      if (tableCFs == null || !tableCFs.containsKey(tabName)) {
-        return null;
-      }
-
-      return entry;
+      return null;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/644bfe36/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationBarrierCleaner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationBarrierCleaner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationBarrierCleaner.java
new file mode 100644
index 0000000..671bc22
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationBarrierCleaner.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.cleaner;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestReplicationBarrierCleaner {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestReplicationBarrierCleaner.class);
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestHFileCleaner.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  @Rule
+  public final TestName name = new TestName();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    UTIL.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    try (Table table = 
UTIL.getConnection().getTable(TableName.META_TABLE_NAME);
+        ResultScanner scanner = table.getScanner(new 
Scan().addFamily(HConstants.CATALOG_FAMILY)
+          .addFamily(HConstants.REPLICATION_BARRIER_FAMILY).setFilter(new 
FirstKeyOnlyFilter()))) {
+      for (;;) {
+        Result result = scanner.next();
+        if (result == null) {
+          break;
+        }
+        TableName tableName = RegionInfo.getTable(result.getRow());
+        if (!tableName.isSystemTable()) {
+          table.delete(new Delete(result.getRow()));
+        }
+      }
+    }
+  }
+
+  private ReplicationPeerManager create(ReplicationQueueStorage queueStorage,
+      List<String> firstPeerIds, @SuppressWarnings("unchecked") 
List<String>... peerIds) {
+    ReplicationPeerManager peerManager = mock(ReplicationPeerManager.class);
+    if (queueStorage != null) {
+      when(peerManager.getQueueStorage()).thenReturn(queueStorage);
+    }
+    if (peerIds.length == 0) {
+      
when(peerManager.getSerialPeerIdsBelongsTo(any(TableName.class))).thenReturn(firstPeerIds);
+    } else {
+      
when(peerManager.getSerialPeerIdsBelongsTo(any(TableName.class))).thenReturn(firstPeerIds,
+        peerIds);
+    }
+    return peerManager;
+  }
+
+  private ReplicationQueueStorage create(Long lastPushedSeqId, Long... 
lastPushedSeqIds)
+      throws ReplicationException {
+    ReplicationQueueStorage queueStorage = mock(ReplicationQueueStorage.class);
+    if (lastPushedSeqIds.length == 0) {
+      when(queueStorage.getLastSequenceId(anyString(), 
anyString())).thenReturn(lastPushedSeqId);
+    } else {
+      when(queueStorage.getLastSequenceId(anyString(), 
anyString())).thenReturn(lastPushedSeqId,
+        lastPushedSeqIds);
+    }
+    return queueStorage;
+  }
+
+  private ReplicationBarrierCleaner create(ReplicationPeerManager peerManager) 
throws IOException {
+    return new ReplicationBarrierCleaner(UTIL.getConfiguration(), new 
WarnOnlyStoppable(),
+      UTIL.getConnection(), peerManager);
+  }
+
+  private void addBarrier(RegionInfo region, long... barriers) throws 
IOException {
+    Put put = new Put(region.getRegionName(), 
EnvironmentEdgeManager.currentTime());
+    for (int i = 0; i < barriers.length; i++) {
+      put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, 
HConstants.SEQNUM_QUALIFIER,
+        put.getTimeStamp() - barriers.length + i, Bytes.toBytes(barriers[i]));
+    }
+    try (Table table = 
UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
+      table.put(put);
+    }
+  }
+
+  private void fillCatalogFamily(RegionInfo region) throws IOException {
+    try (Table table = 
UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
+      table.put(new 
Put(region.getRegionName()).addColumn(HConstants.CATALOG_FAMILY,
+        Bytes.toBytes("whatever"), Bytes.toBytes("whatever")));
+    }
+  }
+
+  private void clearCatalogFamily(RegionInfo region) throws IOException {
+    try (Table table = 
UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
+      table.delete(new 
Delete(region.getRegionName()).addFamily(HConstants.CATALOG_FAMILY));
+    }
+  }
+
+  @Test
+  public void testNothing() throws IOException {
+    ReplicationPeerManager peerManager = mock(ReplicationPeerManager.class);
+    ReplicationBarrierCleaner cleaner = create(peerManager);
+    cleaner.chore();
+    verify(peerManager, 
never()).getSerialPeerIdsBelongsTo(any(TableName.class));
+    verify(peerManager, never()).getQueueStorage();
+  }
+
+  @Test
+  public void testCleanNoPeers() throws IOException {
+    TableName tableName1 = TableName.valueOf(name.getMethodName() + "_1");
+    RegionInfo region11 =
+      
RegionInfoBuilder.newBuilder(tableName1).setEndKey(Bytes.toBytes(1)).build();
+    addBarrier(region11, 10, 20, 30, 40, 50, 60);
+    RegionInfo region12 =
+      
RegionInfoBuilder.newBuilder(tableName1).setStartKey(Bytes.toBytes(1)).build();
+    addBarrier(region12, 20, 30, 40, 50, 60, 70);
+
+    TableName tableName2 = TableName.valueOf(name.getMethodName() + "_2");
+    RegionInfo region21 =
+      
RegionInfoBuilder.newBuilder(tableName2).setEndKey(Bytes.toBytes(1)).build();
+    addBarrier(region21, 100, 200, 300, 400);
+    RegionInfo region22 =
+      
RegionInfoBuilder.newBuilder(tableName2).setStartKey(Bytes.toBytes(1)).build();
+    addBarrier(region22, 200, 300, 400, 500, 600);
+
+    @SuppressWarnings("unchecked")
+    ReplicationPeerManager peerManager =
+      create(null, Collections.emptyList(), Collections.emptyList());
+    ReplicationBarrierCleaner cleaner = create(peerManager);
+    cleaner.chore();
+
+    // should never call this method
+    verify(peerManager, never()).getQueueStorage();
+    // should only be called twice although we have 4 regions to clean
+    verify(peerManager, 
times(2)).getSerialPeerIdsBelongsTo(any(TableName.class));
+
+    assertArrayEquals(new long[] { 60 },
+      MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), 
region11.getRegionName()));
+    assertArrayEquals(new long[] { 70 },
+      MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), 
region12.getRegionName()));
+
+    assertArrayEquals(new long[] { 400 },
+      MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), 
region21.getRegionName()));
+    assertArrayEquals(new long[] { 600 },
+      MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), 
region22.getRegionName()));
+  }
+
+  @Test
+  public void testDeleteBarriers() throws IOException, ReplicationException {
+    TableName tableName = TableName.valueOf(name.getMethodName());
+    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
+    addBarrier(region, 10, 20, 30, 40, 50, 60);
+    // two peers
+    ReplicationQueueStorage queueStorage = create(-1L, 2L, 15L, 25L, 20L, 25L, 
65L, 55L, 70L, 70L);
+    List<String> peerIds = Lists.newArrayList("1", "2");
+
+    @SuppressWarnings("unchecked")
+    ReplicationPeerManager peerManager =
+      create(queueStorage, peerIds, peerIds, peerIds, peerIds, peerIds);
+    ReplicationBarrierCleaner cleaner = create(peerManager);
+
+    // beyond the first barrier, no deletion
+    cleaner.chore();
+    assertArrayEquals(new long[] { 10, 20, 30, 40, 50, 60 },
+      MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), 
region.getRegionName()));
+
+    // in the first range, still no deletion
+    cleaner.chore();
+    assertArrayEquals(new long[] { 10, 20, 30, 40, 50, 60 },
+      MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), 
region.getRegionName()));
+
+    // in the second range, 10 is deleted
+    cleaner.chore();
+    assertArrayEquals(new long[] { 20, 30, 40, 50, 60 },
+      MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), 
region.getRegionName()));
+
+    // between 50 and 60, so the barriers before 50 will be deleted
+    cleaner.chore();
+    assertArrayEquals(new long[] { 50, 60 },
+      MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), 
region.getRegionName()));
+
+    // in the last open range, 50 is deleted
+    cleaner.chore();
+    assertArrayEquals(new long[] { 60 },
+      MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), 
region.getRegionName()));
+  }
+
+  @Test
+  public void testDeleteRowForDeletedRegion() throws IOException, 
ReplicationException {
+    TableName tableName = TableName.valueOf(name.getMethodName());
+    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
+    addBarrier(region, 40, 50, 60);
+    fillCatalogFamily(region);
+
+    ReplicationQueueStorage queueStorage = create(59L);
+    @SuppressWarnings("unchecked")
+    ReplicationPeerManager peerManager = create(queueStorage, 
Lists.newArrayList("1"));
+    ReplicationBarrierCleaner cleaner = create(peerManager);
+
+    // we have something in catalog family, so only delete 40
+    cleaner.chore();
+    assertArrayEquals(new long[] { 50, 60 },
+      MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), 
region.getRegionName()));
+
+    // No catalog family, then we should remove the whole row
+    clearCatalogFamily(region);
+    cleaner.chore();
+    try (Table table = 
UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
+      assertFalse(table
+        .exists(new 
Get(region.getRegionName()).addFamily(HConstants.REPLICATION_BARRIER_FAMILY)));
+    }
+  }
+
+  private static class WarnOnlyStoppable implements Stoppable {
+    @Override
+    public void stop(String why) {
+      LOG.warn("TestReplicationBarrierCleaner received stop, ignoring. Reason: 
" + why);
+    }
+
+    @Override
+    public boolean isStopped() {
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/644bfe36/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
index 58e9543..29749bd 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
@@ -157,7 +157,7 @@ public class TestSerialReplicationChecker {
     }
     for (int i = 0; i < barriers.length; i++) {
       put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, 
HConstants.SEQNUM_QUALIFIER,
-        put.getTimeStamp() - i, Bytes.toBytes(barriers[i]));
+        put.getTimeStamp() - barriers.length + i, Bytes.toBytes(barriers[i]));
     }
     try (Table table = 
UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
       table.put(put);

Reply via email to