HBASE-20117 Cleanup the unused replication barriers in meta table
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/644bfe36 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/644bfe36 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/644bfe36 Branch: refs/heads/branch-2 Commit: 644bfe36b297b2787bf07a46eb6f5085322edfa9 Parents: fedf3ca Author: zhangduo <zhang...@apache.org> Authored: Tue Mar 13 21:36:06 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Mon Apr 9 15:18:44 2018 +0800 ---------------------------------------------------------------------- .../apache/hadoop/hbase/MetaTableAccessor.java | 2 +- .../hbase/replication/ReplicationUtils.java | 56 +++- .../org/apache/hadoop/hbase/master/HMaster.java | 91 +++--- .../cleaner/ReplicationBarrierCleaner.java | 162 ++++++++++ .../replication/ReplicationPeerManager.java | 10 + .../NamespaceTableCfWALEntryFilter.java | 39 +-- .../cleaner/TestReplicationBarrierCleaner.java | 293 +++++++++++++++++++ .../TestSerialReplicationChecker.java | 2 +- 8 files changed, 565 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/644bfe36/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java index 2a88b56..a800c1c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java @@ -2053,7 +2053,7 @@ public class MetaTableAccessor { return Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()); } - private static long[] getReplicationBarriers(Result result) { + public static long[] getReplicationBarriers(Result result) { return result.getColumnCells(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER) .stream().mapToLong(MetaTableAccessor::getReplicationBarrier).sorted().distinct().toArray(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/644bfe36/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java index 857b385..e2479e0 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java @@ -39,15 +39,6 @@ public final class ReplicationUtils { private ReplicationUtils() { } - /** - * @param c Configuration to look at - * @return True if replication for bulk load data is enabled. - */ - public static boolean isReplicationForBulkLoadDataEnabled(final Configuration c) { - return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, - HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); - } - public static Configuration getPeerClusterConfiguration(ReplicationPeerConfig peerConfig, Configuration baseConf) throws ReplicationException { Configuration otherConf; @@ -135,4 +126,51 @@ public final class ReplicationUtils { isTableCFsEqual(rpc1.getTableCFsMap(), rpc2.getTableCFsMap()); } } + + /** + * @param c Configuration to look at + * @return True if replication for bulk load data is enabled. + */ + public static boolean isReplicationForBulkLoadDataEnabled(final Configuration c) { + return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, + HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); + } + + /** + * Returns whether we should replicate the given table. + */ + public static boolean contains(ReplicationPeerConfig peerConfig, TableName tableName) { + String namespace = tableName.getNamespaceAsString(); + if (peerConfig.replicateAllUserTables()) { + // replicate all user tables, but filter by exclude namespaces and table-cfs config + Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces(); + if (excludeNamespaces != null && excludeNamespaces.contains(namespace)) { + return false; + } + Map<TableName, List<String>> excludedTableCFs = peerConfig.getTableCFsMap(); + // trap here, must check existence first since HashMap allows null value. + if (excludedTableCFs == null || !excludedTableCFs.containsKey(tableName)) { + return true; + } + List<String> cfs = excludedTableCFs.get(tableName); + // if cfs is null or empty then we can make sure that we do not need to replicate this table, + // otherwise, we may still need to replicate the table but filter out some families. + return cfs != null && !cfs.isEmpty(); + } else { + // Not replicate all user tables, so filter by namespaces and table-cfs config + Set<String> namespaces = peerConfig.getNamespaces(); + Map<TableName, List<String>> tableCFs = peerConfig.getTableCFsMap(); + + if (namespaces == null && tableCFs == null) { + return false; + } + + // First filter by namespaces config + // If table's namespace in peer config, all the tables data are applicable for replication + if (namespaces != null && namespaces.contains(namespace)) { + return true; + } + return tableCFs != null && tableCFs.containsKey(tableName); + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/644bfe36/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 7d751fb..6d0b58b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.PleaseHoldException; import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.ServerMetricsBuilder; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableDescriptors; @@ -109,6 +110,7 @@ import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; import org.apache.hadoop.hbase.master.cleaner.CleanerChore; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.master.cleaner.LogCleaner; +import org.apache.hadoop.hbase.master.cleaner.ReplicationBarrierCleaner; import org.apache.hadoop.hbase.master.locking.LockManager; import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan; import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType; @@ -371,6 +373,7 @@ public class HMaster extends HRegionServer implements MasterServices { CatalogJanitor catalogJanitorChore; private LogCleaner logCleaner; private HFileCleaner hfileCleaner; + private ReplicationBarrierCleaner replicationBarrierCleaner; private ExpiredMobFileCleanerChore expiredMobFileCleanerChore; private MobCompactionChore mobCompactChore; private MasterMobCompactionThread mobCompactThread; @@ -1179,19 +1182,30 @@ public class HMaster extends HRegionServer implements MasterServices { getMasterWalManager().getOldLogDir()); getChoreService().scheduleChore(logCleaner); - //start the hfile archive cleaner thread + // start the hfile archive cleaner thread Path archiveDir = HFileArchiveUtil.getArchivePath(conf); Map<String, Object> params = new HashMap<>(); params.put(MASTER, this); this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem() .getFileSystem(), archiveDir, params); getChoreService().scheduleChore(hfileCleaner); + + replicationBarrierCleaner = + new ReplicationBarrierCleaner(conf, this, getConnection(), replicationPeerManager); + getChoreService().scheduleChore(replicationBarrierCleaner); + serviceStarted = true; if (LOG.isTraceEnabled()) { LOG.trace("Started service threads"); } } + private void cancelChore(ScheduledChore chore) { + if (chore != null) { + chore.cancel(); + } + } + @Override protected void stopServiceThreads() { if (masterJettyServer != null) { @@ -1205,24 +1219,33 @@ public class HMaster extends HRegionServer implements MasterServices { super.stopServiceThreads(); stopChores(); - if (LOG.isDebugEnabled()) { - LOG.debug("Stopping service threads"); - } + LOG.debug("Stopping service threads"); - // Clean up and close up shop - if (this.logCleaner != null) this.logCleaner.cancel(true); - if (this.hfileCleaner != null) this.hfileCleaner.cancel(true); - if (this.quotaManager != null) this.quotaManager.stop(); + if (this.quotaManager != null) { + this.quotaManager.stop(); + } - if (this.activeMasterManager != null) this.activeMasterManager.stop(); - if (this.serverManager != null) this.serverManager.stop(); - if (this.assignmentManager != null) this.assignmentManager.stop(); + if (this.activeMasterManager != null) { + this.activeMasterManager.stop(); + } + if (this.serverManager != null) { + this.serverManager.stop(); + } + if (this.assignmentManager != null) { + this.assignmentManager.stop(); + } stopProcedureExecutor(); - if (this.walManager != null) this.walManager.stop(); - if (this.fileSystemManager != null) this.fileSystemManager.stop(); - if (this.mpmHost != null) this.mpmHost.stop("server shutting down."); + if (this.walManager != null) { + this.walManager.stop(); + } + if (this.fileSystemManager != null) { + this.fileSystemManager.stop(); + } + if (this.mpmHost != null) { + this.mpmHost.stop("server shutting down."); + } } private void startProcedureExecutor() throws IOException { @@ -1261,37 +1284,21 @@ public class HMaster extends HRegionServer implements MasterServices { } private void stopChores() { - if (this.expiredMobFileCleanerChore != null) { - this.expiredMobFileCleanerChore.cancel(true); - } - if (this.mobCompactChore != null) { - this.mobCompactChore.cancel(true); - } - if (this.balancerChore != null) { - this.balancerChore.cancel(true); - } - if (this.normalizerChore != null) { - this.normalizerChore.cancel(true); - } - if (this.clusterStatusChore != null) { - this.clusterStatusChore.cancel(true); - } - if (this.catalogJanitorChore != null) { - this.catalogJanitorChore.cancel(true); - } - if (this.clusterStatusPublisherChore != null){ - clusterStatusPublisherChore.cancel(true); - } + cancelChore(this.expiredMobFileCleanerChore); + cancelChore(this.mobCompactChore); + cancelChore(this.balancerChore); + cancelChore(this.normalizerChore); + cancelChore(this.clusterStatusChore); + cancelChore(this.catalogJanitorChore); + cancelChore(this.clusterStatusPublisherChore); if (this.mobCompactThread != null) { this.mobCompactThread.close(); } - - if (this.quotaObserverChore != null) { - quotaObserverChore.cancel(); - } - if (this.snapshotQuotaChore != null) { - snapshotQuotaChore.cancel(); - } + cancelChore(this.clusterStatusPublisherChore); + cancelChore(this.snapshotQuotaChore); + cancelChore(this.logCleaner); + cancelChore(this.hfileCleaner); + cancelChore(this.replicationBarrierCleaner); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/644bfe36/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationBarrierCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationBarrierCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationBarrierCleaner.java new file mode 100644 index 0000000..16b8fc5 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationBarrierCleaner.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.cleaner; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Used to clean the useless barriers in {@link HConstants#REPLICATION_BARRIER_FAMILY_STR} family in + * meta table. + */ +@InterfaceAudience.Private +public class ReplicationBarrierCleaner extends ScheduledChore { + + private static final Logger LOG = LoggerFactory.getLogger(ReplicationBarrierCleaner.class); + + private static final String REPLICATION_BARRIER_CLEANER_INTERVAL = + "hbase.master.cleaner.replication.barrier.interval"; + + // 12 hour. Usually regions will not be moved so the barrier are rarely updated. Use a large + // interval. + private static final int DEFAULT_REPLICATION_BARRIER_CLEANER_INTERVAL = 12 * 60 * 60 * 1000; + + private final Connection conn; + + private final ReplicationPeerManager peerManager; + + public ReplicationBarrierCleaner(Configuration conf, Stoppable stopper, Connection conn, + ReplicationPeerManager peerManager) { + super("ReplicationBarrierCleaner", stopper, conf.getInt(REPLICATION_BARRIER_CLEANER_INTERVAL, + DEFAULT_REPLICATION_BARRIER_CLEANER_INTERVAL)); + this.conn = conn; + this.peerManager = peerManager; + } + + @Override + protected void chore() { + long totalRows = 0; + long cleanedRows = 0; + long deletedRows = 0; + long deletedBarriers = 0; + TableName tableName = null; + List<String> peerIds = null; + try (Table metaTable = conn.getTable(TableName.META_TABLE_NAME); + ResultScanner scanner = metaTable.getScanner( + new Scan().addFamily(HConstants.REPLICATION_BARRIER_FAMILY).readAllVersions())) { + for (;;) { + Result result = scanner.next(); + if (result == null) { + break; + } + totalRows++; + long[] barriers = MetaTableAccessor.getReplicationBarriers(result); + if (barriers.length == 0) { + continue; + } + byte[] regionName = result.getRow(); + TableName tn = RegionInfo.getTable(regionName); + if (!tn.equals(tableName)) { + tableName = tn; + peerIds = peerManager.getSerialPeerIdsBelongsTo(tableName); + } + if (peerIds.isEmpty()) { + // no serial replication, only keep the newest barrier + Cell cell = result.getColumnLatestCell(HConstants.REPLICATION_BARRIER_FAMILY, + HConstants.SEQNUM_QUALIFIER); + metaTable.delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY, + cell.getTimestamp() - 1)); + cleanedRows++; + deletedBarriers += barriers.length - 1; + continue; + } + String encodedRegionName = RegionInfo.encodeRegionName(regionName); + long pushedSeqId = Long.MAX_VALUE; + for (String peerId : peerIds) { + pushedSeqId = Math.min(pushedSeqId, + peerManager.getQueueStorage().getLastSequenceId(encodedRegionName, peerId)); + } + int index = Arrays.binarySearch(barriers, pushedSeqId); + if (index == -1) { + // beyond the first barrier, usually this should not happen but anyway let's add a check + // for it. + continue; + } + if (index < 0) { + index = -index - 1; + } else { + index++; + } + // A special case for merged/split region, where we are in the last closed range and the + // pushedSeqId is the last barrier minus 1. + if (index == barriers.length - 1 && pushedSeqId == barriers[barriers.length - 1] - 1) { + // check if the region has already been removed, i.e, no catalog family + if (!metaTable.exists(new Get(regionName).addFamily(HConstants.CATALOG_FAMILY))) { + metaTable + .delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY)); + deletedRows++; + deletedBarriers += barriers.length; + continue; + } + } + // the barrier before 'index - 1'(exclusive) can be safely removed. See the algorithm in + // SerialReplicationChecker for more details. + if (index - 1 > 0) { + List<Cell> cells = result.getColumnCells(HConstants.REPLICATION_BARRIER_FAMILY, + HConstants.SEQNUM_QUALIFIER); + // All barriers before this cell(exclusive) can be removed + Cell cell = cells.get(cells.size() - index); + metaTable.delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY, + cell.getTimestamp() - 1)); + cleanedRows++; + deletedBarriers += index - 1; + } + } + } catch (ReplicationException | IOException e) { + LOG.warn("Failed to clean up replication barrier", e); + } + if (totalRows > 0) { + LOG.info( + "Cleanup replication barriers: " + + "totalRows {}, cleanedRows {}, deletedRows {}, deletedBarriers {}", + totalRows, cleanedRows, deletedRows, deletedBarriers); + } + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/644bfe36/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 19cd89d..1e93373 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -327,6 +327,16 @@ public class ReplicationPeerManager { } } + public List<String> getSerialPeerIdsBelongsTo(TableName tableName) { + return peers.values().stream().filter(p -> p.getPeerConfig().isSerial()) + .filter(p -> ReplicationUtils.contains(p.getPeerConfig(), tableName)).map(p -> p.getPeerId()) + .collect(Collectors.toList()); + } + + public ReplicationQueueStorage getQueueStorage() { + return queueStorage; + } + public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf) throws ReplicationException { ReplicationPeerStorage peerStorage = http://git-wip-us.apache.org/repos/asf/hbase/blob/644bfe36/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java index 08c9f37..3a3200a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.TableName; @@ -53,44 +52,10 @@ public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFi @Override public Entry filter(Entry entry) { - TableName tabName = entry.getKey().getTableName(); - String namespace = tabName.getNamespaceAsString(); - ReplicationPeerConfig peerConfig = this.peer.getPeerConfig(); - - if (peerConfig.replicateAllUserTables()) { - // replicate all user tables, but filter by exclude namespaces config - Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces(); - - // return null(prevent replicating) if logKey's table is in this peer's - // exclude namespaces list - if (excludeNamespaces != null && excludeNamespaces.contains(namespace)) { - return null; - } - + if (ReplicationUtils.contains(this.peer.getPeerConfig(), entry.getKey().getTableName())) { return entry; } else { - // Not replicate all user tables, so filter by namespaces and table-cfs config - Set<String> namespaces = peerConfig.getNamespaces(); - Map<TableName, List<String>> tableCFs = peerConfig.getTableCFsMap(); - - if (namespaces == null && tableCFs == null) { - return null; - } - - // First filter by namespaces config - // If table's namespace in peer config, all the tables data are applicable for replication - if (namespaces != null && namespaces.contains(namespace)) { - return entry; - } - - // Then filter by table-cfs config - // return null(prevent replicating) if logKey's table isn't in this peer's - // replicable tables list - if (tableCFs == null || !tableCFs.containsKey(tabName)) { - return null; - } - - return entry; + return null; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/644bfe36/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationBarrierCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationBarrierCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationBarrierCleaner.java new file mode 100644 index 0000000..671bc22 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationBarrierCleaner.java @@ -0,0 +1,293 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.cleaner; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertFalse; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestReplicationBarrierCleaner { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReplicationBarrierCleaner.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestHFileCleaner.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + @Rule + public final TestName name = new TestName(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @After + public void tearDown() throws IOException { + try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME); + ResultScanner scanner = table.getScanner(new Scan().addFamily(HConstants.CATALOG_FAMILY) + .addFamily(HConstants.REPLICATION_BARRIER_FAMILY).setFilter(new FirstKeyOnlyFilter()))) { + for (;;) { + Result result = scanner.next(); + if (result == null) { + break; + } + TableName tableName = RegionInfo.getTable(result.getRow()); + if (!tableName.isSystemTable()) { + table.delete(new Delete(result.getRow())); + } + } + } + } + + private ReplicationPeerManager create(ReplicationQueueStorage queueStorage, + List<String> firstPeerIds, @SuppressWarnings("unchecked") List<String>... peerIds) { + ReplicationPeerManager peerManager = mock(ReplicationPeerManager.class); + if (queueStorage != null) { + when(peerManager.getQueueStorage()).thenReturn(queueStorage); + } + if (peerIds.length == 0) { + when(peerManager.getSerialPeerIdsBelongsTo(any(TableName.class))).thenReturn(firstPeerIds); + } else { + when(peerManager.getSerialPeerIdsBelongsTo(any(TableName.class))).thenReturn(firstPeerIds, + peerIds); + } + return peerManager; + } + + private ReplicationQueueStorage create(Long lastPushedSeqId, Long... lastPushedSeqIds) + throws ReplicationException { + ReplicationQueueStorage queueStorage = mock(ReplicationQueueStorage.class); + if (lastPushedSeqIds.length == 0) { + when(queueStorage.getLastSequenceId(anyString(), anyString())).thenReturn(lastPushedSeqId); + } else { + when(queueStorage.getLastSequenceId(anyString(), anyString())).thenReturn(lastPushedSeqId, + lastPushedSeqIds); + } + return queueStorage; + } + + private ReplicationBarrierCleaner create(ReplicationPeerManager peerManager) throws IOException { + return new ReplicationBarrierCleaner(UTIL.getConfiguration(), new WarnOnlyStoppable(), + UTIL.getConnection(), peerManager); + } + + private void addBarrier(RegionInfo region, long... barriers) throws IOException { + Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime()); + for (int i = 0; i < barriers.length; i++) { + put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER, + put.getTimeStamp() - barriers.length + i, Bytes.toBytes(barriers[i])); + } + try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { + table.put(put); + } + } + + private void fillCatalogFamily(RegionInfo region) throws IOException { + try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { + table.put(new Put(region.getRegionName()).addColumn(HConstants.CATALOG_FAMILY, + Bytes.toBytes("whatever"), Bytes.toBytes("whatever"))); + } + } + + private void clearCatalogFamily(RegionInfo region) throws IOException { + try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { + table.delete(new Delete(region.getRegionName()).addFamily(HConstants.CATALOG_FAMILY)); + } + } + + @Test + public void testNothing() throws IOException { + ReplicationPeerManager peerManager = mock(ReplicationPeerManager.class); + ReplicationBarrierCleaner cleaner = create(peerManager); + cleaner.chore(); + verify(peerManager, never()).getSerialPeerIdsBelongsTo(any(TableName.class)); + verify(peerManager, never()).getQueueStorage(); + } + + @Test + public void testCleanNoPeers() throws IOException { + TableName tableName1 = TableName.valueOf(name.getMethodName() + "_1"); + RegionInfo region11 = + RegionInfoBuilder.newBuilder(tableName1).setEndKey(Bytes.toBytes(1)).build(); + addBarrier(region11, 10, 20, 30, 40, 50, 60); + RegionInfo region12 = + RegionInfoBuilder.newBuilder(tableName1).setStartKey(Bytes.toBytes(1)).build(); + addBarrier(region12, 20, 30, 40, 50, 60, 70); + + TableName tableName2 = TableName.valueOf(name.getMethodName() + "_2"); + RegionInfo region21 = + RegionInfoBuilder.newBuilder(tableName2).setEndKey(Bytes.toBytes(1)).build(); + addBarrier(region21, 100, 200, 300, 400); + RegionInfo region22 = + RegionInfoBuilder.newBuilder(tableName2).setStartKey(Bytes.toBytes(1)).build(); + addBarrier(region22, 200, 300, 400, 500, 600); + + @SuppressWarnings("unchecked") + ReplicationPeerManager peerManager = + create(null, Collections.emptyList(), Collections.emptyList()); + ReplicationBarrierCleaner cleaner = create(peerManager); + cleaner.chore(); + + // should never call this method + verify(peerManager, never()).getQueueStorage(); + // should only be called twice although we have 4 regions to clean + verify(peerManager, times(2)).getSerialPeerIdsBelongsTo(any(TableName.class)); + + assertArrayEquals(new long[] { 60 }, + MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region11.getRegionName())); + assertArrayEquals(new long[] { 70 }, + MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region12.getRegionName())); + + assertArrayEquals(new long[] { 400 }, + MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region21.getRegionName())); + assertArrayEquals(new long[] { 600 }, + MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region22.getRegionName())); + } + + @Test + public void testDeleteBarriers() throws IOException, ReplicationException { + TableName tableName = TableName.valueOf(name.getMethodName()); + RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build(); + addBarrier(region, 10, 20, 30, 40, 50, 60); + // two peers + ReplicationQueueStorage queueStorage = create(-1L, 2L, 15L, 25L, 20L, 25L, 65L, 55L, 70L, 70L); + List<String> peerIds = Lists.newArrayList("1", "2"); + + @SuppressWarnings("unchecked") + ReplicationPeerManager peerManager = + create(queueStorage, peerIds, peerIds, peerIds, peerIds, peerIds); + ReplicationBarrierCleaner cleaner = create(peerManager); + + // beyond the first barrier, no deletion + cleaner.chore(); + assertArrayEquals(new long[] { 10, 20, 30, 40, 50, 60 }, + MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region.getRegionName())); + + // in the first range, still no deletion + cleaner.chore(); + assertArrayEquals(new long[] { 10, 20, 30, 40, 50, 60 }, + MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region.getRegionName())); + + // in the second range, 10 is deleted + cleaner.chore(); + assertArrayEquals(new long[] { 20, 30, 40, 50, 60 }, + MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region.getRegionName())); + + // between 50 and 60, so the barriers before 50 will be deleted + cleaner.chore(); + assertArrayEquals(new long[] { 50, 60 }, + MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region.getRegionName())); + + // in the last open range, 50 is deleted + cleaner.chore(); + assertArrayEquals(new long[] { 60 }, + MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region.getRegionName())); + } + + @Test + public void testDeleteRowForDeletedRegion() throws IOException, ReplicationException { + TableName tableName = TableName.valueOf(name.getMethodName()); + RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build(); + addBarrier(region, 40, 50, 60); + fillCatalogFamily(region); + + ReplicationQueueStorage queueStorage = create(59L); + @SuppressWarnings("unchecked") + ReplicationPeerManager peerManager = create(queueStorage, Lists.newArrayList("1")); + ReplicationBarrierCleaner cleaner = create(peerManager); + + // we have something in catalog family, so only delete 40 + cleaner.chore(); + assertArrayEquals(new long[] { 50, 60 }, + MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region.getRegionName())); + + // No catalog family, then we should remove the whole row + clearCatalogFamily(region); + cleaner.chore(); + try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { + assertFalse(table + .exists(new Get(region.getRegionName()).addFamily(HConstants.REPLICATION_BARRIER_FAMILY))); + } + } + + private static class WarnOnlyStoppable implements Stoppable { + @Override + public void stop(String why) { + LOG.warn("TestReplicationBarrierCleaner received stop, ignoring. Reason: " + why); + } + + @Override + public boolean isStopped() { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/644bfe36/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java index 58e9543..29749bd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java @@ -157,7 +157,7 @@ public class TestSerialReplicationChecker { } for (int i = 0; i < barriers.length; i++) { put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER, - put.getTimeStamp() - i, Bytes.toBytes(barriers[i])); + put.getTimeStamp() - barriers.length + i, Bytes.toBytes(barriers[i])); } try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { table.put(put);