HBASE-20378 Provide a hbck option to cleanup replication barrier for a table
Signed-off-by: zhangduo <zhang...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/87f5b5f3 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/87f5b5f3 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/87f5b5f3 Branch: refs/heads/HBASE-19064 Commit: 87f5b5f3411d96c31b4cb61b9a57ced22be91d1f Parents: 6225b4a Author: jingyuntian <tianjy1...@gmail.com> Authored: Sat Apr 28 11:34:29 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Fri May 4 15:27:33 2018 +0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/util/HBaseFsck.java | 131 ++++++++++-- .../TestHBaseFsckCleanReplicationBarriers.java | 205 +++++++++++++++++++ .../hadoop/hbase/util/hbck/HbckTestingUtil.java | 20 +- 3 files changed, 336 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/87f5b5f3/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index 9fcf320..6d9ca9a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -58,6 +58,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.StringUtils; @@ -85,6 +86,7 @@ import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ClusterConnection; @@ -99,7 +101,9 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; @@ -115,6 +119,10 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; @@ -268,11 +276,13 @@ public class HBaseFsck extends Configured implements Closeable { private boolean fixHFileLinks = false; // fix lingering HFileLinks private boolean fixEmptyMetaCells = false; // fix (remove) empty REGIONINFO_QUALIFIER rows private boolean fixReplication = false; // fix undeleted replication queues for removed peer + private boolean cleanReplicationBarrier = false; // clean replication barriers of a table private boolean fixAny = false; // Set to true if any of the fix is required. // limit checking/fixes to listed tables, if empty attempt to check/fix all // hbase:meta are always checked private Set<TableName> tablesIncluded = new HashSet<>(); + private TableName cleanReplicationBarrierTable; private int maxMerge = DEFAULT_MAX_MERGE; // maximum number of overlapping regions to merge // maximum number of overlapping regions to sideline private int maxOverlapsToSideline = DEFAULT_OVERLAPS_TO_SIDELINE; @@ -786,6 +796,8 @@ public class HBaseFsck extends Configured implements Closeable { checkAndFixReplication(); + cleanReplicationBarrier(); + // Remove the hbck znode cleanupHbckZnode(); @@ -4118,14 +4130,13 @@ public class HBaseFsck extends Configured implements Closeable { enum ERROR_CODE { UNKNOWN, NO_META_REGION, NULL_META_REGION, NO_VERSION_FILE, NOT_IN_META_HDFS, NOT_IN_META, NOT_IN_META_OR_DEPLOYED, NOT_IN_HDFS_OR_DEPLOYED, NOT_IN_HDFS, SERVER_DOES_NOT_MATCH_META, - NOT_DEPLOYED, - MULTI_DEPLOYED, SHOULD_NOT_BE_DEPLOYED, MULTI_META_REGION, RS_CONNECT_FAILURE, + NOT_DEPLOYED, MULTI_DEPLOYED, SHOULD_NOT_BE_DEPLOYED, MULTI_META_REGION, RS_CONNECT_FAILURE, FIRST_REGION_STARTKEY_NOT_EMPTY, LAST_REGION_ENDKEY_NOT_EMPTY, DUPE_STARTKEYS, HOLE_IN_REGION_CHAIN, OVERLAP_IN_REGION_CHAIN, REGION_CYCLE, DEGENERATE_REGION, ORPHAN_HDFS_REGION, LINGERING_SPLIT_PARENT, NO_TABLEINFO_FILE, LINGERING_REFERENCE_HFILE, LINGERING_HFILELINK, WRONG_USAGE, EMPTY_META_CELL, EXPIRED_TABLE_LOCK, BOUNDARIES_ERROR, ORPHAN_TABLE_STATE, NO_TABLE_STATE, UNDELETED_REPLICATION_QUEUE, DUPE_ENDKEYS, - UNSUPPORTED_OPTION + UNSUPPORTED_OPTION, INVALID_TABLE } void clear(); void report(String message); @@ -4557,6 +4568,10 @@ public class HBaseFsck extends Configured implements Closeable { fixAny |= shouldFix; } + public void setCleanReplicationBarrier(boolean shouldClean) { + cleanReplicationBarrier = shouldClean; + } + /** * Check if we should rerun fsck again. This checks if we've tried to * fix something and we should rerun fsck tool again. @@ -4567,7 +4582,7 @@ public class HBaseFsck extends Configured implements Closeable { rerun = true; } - boolean shouldRerun() { + public boolean shouldRerun() { return rerun; } @@ -4848,7 +4863,11 @@ public class HBaseFsck extends Configured implements Closeable { "-fixHdfsOrphans -fixHdfsOverlaps -fixVersionFile -sidelineBigOverlaps -fixReferenceFiles" + "-fixHFileLinks"); out.println(" -repairHoles Shortcut for -fixAssignments -fixMeta -fixHdfsHoles"); - + out.println(""); + out.println(" Replication options"); + out.println(" -fixReplication Deletes replication queues for removed peers"); + out.println(" -cleanReplicationBrarier [tableName] clean the replication barriers " + + "of a specified table, tableName is required"); out.flush(); errors.reportError(ERROR_CODE.WRONG_USAGE, sw.toString()); @@ -4908,13 +4927,12 @@ public class HBaseFsck extends Configured implements Closeable { return printUsageAndExit(); } try { - long timelag = Long.parseLong(args[i+1]); + long timelag = Long.parseLong(args[++i]); setTimeLag(timelag); } catch (NumberFormatException e) { errors.reportError(ERROR_CODE.WRONG_USAGE, "-timelag needs a numeric value."); return printUsageAndExit(); } - i++; } else if (cmd.equals("-sleepBeforeRerun")) { if (i == args.length - 1) { errors.reportError(ERROR_CODE.WRONG_USAGE, @@ -4922,19 +4940,17 @@ public class HBaseFsck extends Configured implements Closeable { return printUsageAndExit(); } try { - sleepBeforeRerun = Long.parseLong(args[i+1]); + sleepBeforeRerun = Long.parseLong(args[++i]); } catch (NumberFormatException e) { errors.reportError(ERROR_CODE.WRONG_USAGE, "-sleepBeforeRerun needs a numeric value."); return printUsageAndExit(); } - i++; } else if (cmd.equals("-sidelineDir")) { if (i == args.length - 1) { errors.reportError(ERROR_CODE.WRONG_USAGE, "HBaseFsck: -sidelineDir needs a value."); return printUsageAndExit(); } - i++; - setSidelineDir(args[i]); + setSidelineDir(args[++i]); } else if (cmd.equals("-fix")) { errors.reportError(ERROR_CODE.WRONG_USAGE, "This option is deprecated, please use -fixAssignments instead."); @@ -5004,14 +5020,13 @@ public class HBaseFsck extends Configured implements Closeable { return printUsageAndExit(); } try { - int maxOverlapsToSideline = Integer.parseInt(args[i+1]); + int maxOverlapsToSideline = Integer.parseInt(args[++i]); setMaxOverlapsToSideline(maxOverlapsToSideline); } catch (NumberFormatException e) { errors.reportError(ERROR_CODE.WRONG_USAGE, "-maxOverlapsToSideline needs a numeric value argument."); return printUsageAndExit(); } - i++; } else if (cmd.equals("-maxMerge")) { if (i == args.length - 1) { errors.reportError(ERROR_CODE.WRONG_USAGE, @@ -5019,14 +5034,13 @@ public class HBaseFsck extends Configured implements Closeable { return printUsageAndExit(); } try { - int maxMerge = Integer.parseInt(args[i+1]); + int maxMerge = Integer.parseInt(args[++i]); setMaxMerge(maxMerge); } catch (NumberFormatException e) { errors.reportError(ERROR_CODE.WRONG_USAGE, "-maxMerge needs a numeric value argument."); return printUsageAndExit(); } - i++; } else if (cmd.equals("-summary")) { setSummary(); } else if (cmd.equals("-metaonly")) { @@ -5035,6 +5049,12 @@ public class HBaseFsck extends Configured implements Closeable { setRegionBoundariesCheck(); } else if (cmd.equals("-fixReplication")) { setFixReplication(true); + } else if (cmd.equals("-cleanReplicationBarrier")) { + setCleanReplicationBarrier(true); + if(args[++i].startsWith("-")){ + printUsageAndExit(); + } + setCleanReplicationBarrierTable(args[i]); } else if (cmd.startsWith("-")) { errors.reportError(ERROR_CODE.WRONG_USAGE, "Unrecognized option:" + cmd); return printUsageAndExit(); @@ -5120,7 +5140,7 @@ public class HBaseFsck extends Configured implements Closeable { boolean result = true; String hbaseServerVersion = status.getHBaseVersion(); Object[] versionComponents = VersionInfo.getVersionComponents(hbaseServerVersion); - if (versionComponents[0] instanceof Integer && ((Integer)versionComponents[0]) >= 2) { + if (versionComponents[0] instanceof Integer && ((Integer) versionComponents[0]) >= 2) { // Process command-line args. for (String arg : args) { if (unsupportedOptionsInV2.contains(arg)) { @@ -5134,6 +5154,85 @@ public class HBaseFsck extends Configured implements Closeable { return result; } + public void setCleanReplicationBarrierTable(String cleanReplicationBarrierTable) { + this.cleanReplicationBarrierTable = TableName.valueOf(cleanReplicationBarrierTable); + } + + public void cleanReplicationBarrier() throws IOException { + if (!cleanReplicationBarrier || cleanReplicationBarrierTable == null) { + return; + } + if (cleanReplicationBarrierTable.isSystemTable()) { + errors.reportError(ERROR_CODE.INVALID_TABLE, + "invalid table: " + cleanReplicationBarrierTable); + return; + } + + boolean isGlobalScope = false; + try { + isGlobalScope = admin.getDescriptor(cleanReplicationBarrierTable).hasGlobalReplicationScope(); + } catch (TableNotFoundException e) { + LOG.info("we may need to clean some erroneous data due to bugs"); + } + + if (isGlobalScope) { + errors.reportError(ERROR_CODE.INVALID_TABLE, + "table's replication scope is global: " + cleanReplicationBarrierTable); + return; + } + List<byte[]> regionNames = new ArrayList<>(); + Scan barrierScan = new Scan(); + barrierScan.setCaching(100); + barrierScan.addFamily(HConstants.REPLICATION_BARRIER_FAMILY); + barrierScan + .withStartRow(MetaTableAccessor.getTableStartRowForMeta(cleanReplicationBarrierTable, + MetaTableAccessor.QueryType.REGION)) + .withStopRow(MetaTableAccessor.getTableStopRowForMeta(cleanReplicationBarrierTable, + MetaTableAccessor.QueryType.REGION)); + Result result; + try (ResultScanner scanner = meta.getScanner(barrierScan)) { + while ((result = scanner.next()) != null) { + regionNames.add(result.getRow()); + } + } + if (regionNames.size() <= 0) { + errors.reportError(ERROR_CODE.INVALID_TABLE, + "there is no barriers of this table: " + cleanReplicationBarrierTable); + return; + } + ReplicationQueueStorage queueStorage = + ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf()); + List<ReplicationPeerDescription> peerDescriptions = admin.listReplicationPeers(); + if (peerDescriptions != null && peerDescriptions.size() > 0) { + List<String> peers = peerDescriptions.stream() + .filter(peerConfig -> ReplicationUtils.contains(peerConfig.getPeerConfig(), + cleanReplicationBarrierTable)) + .map(peerConfig -> peerConfig.getPeerId()).collect(Collectors.toList()); + try { + List<String> batch = new ArrayList<>(); + for (String peer : peers) { + for (byte[] regionName : regionNames) { + batch.add(RegionInfo.encodeRegionName(regionName)); + if (batch.size() % 100 == 0) { + queueStorage.removeLastSequenceIds(peer, batch); + batch.clear(); + } + } + if (batch.size() > 0) { + queueStorage.removeLastSequenceIds(peer, batch); + batch.clear(); + } + } + } catch (ReplicationException re) { + throw new IOException(re); + } + } + for (byte[] regionName : regionNames) { + meta.delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY)); + } + setShouldRerun(); + } + /** * ls -r for debugging purposes */ http://git-wip-us.apache.org/repos/asf/hbase/blob/87f5b5f3/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java new file mode 100644 index 0000000..375f2ed --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestHBaseFsckCleanReplicationBarriers { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestHBaseFsckCleanReplicationBarriers.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static String PEER_1 = "1", PEER_2 = "2"; + + private static ReplicationQueueStorage QUEUE_STORAGE; + + private static String WAL_FILE_NAME = "test.wal"; + + private static String TABLE_NAME = "test"; + + private static String COLUMN_FAMILY = "info"; + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniCluster(1); + QUEUE_STORAGE = ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getZooKeeperWatcher(), + UTIL.getConfiguration()); + createPeer(); + QUEUE_STORAGE.addWAL(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), PEER_1, + WAL_FILE_NAME); + QUEUE_STORAGE.addWAL(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), PEER_2, + WAL_FILE_NAME); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + public void testCleanReplicationBarrierWithNonExistTable() + throws ClassNotFoundException, IOException { + TableName tableName = TableName.valueOf(TABLE_NAME + "_non"); + boolean cleaned = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName); + assertFalse(cleaned); + } + + @Test + public void testCleanReplicationBarrierWithDeletedTable() throws Exception { + TableName tableName = TableName.valueOf(TABLE_NAME + "_deleted"); + List<RegionInfo> regionInfos = new ArrayList<>(); + // only write some barriers into meta table + + for (int i = 0; i < 110; i++) { + RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(i)) + .setEndKey(Bytes.toBytes(i + 1)).build(); + regionInfos.add(regionInfo); + addStateAndBarrier(regionInfo, RegionState.State.OPEN, 10, 100); + updatePushedSeqId(regionInfo, 10); + assertEquals("check if there is lastPushedId", 10, + QUEUE_STORAGE.getLastSequenceId(regionInfo.getEncodedName(), PEER_1)); + assertEquals("check if there is lastPushedId", 10, + QUEUE_STORAGE.getLastSequenceId(regionInfo.getEncodedName(), PEER_2)); + } + Scan barrierScan = new Scan(); + barrierScan.setCaching(100); + barrierScan.addFamily(HConstants.REPLICATION_BARRIER_FAMILY); + barrierScan + .withStartRow( + MetaTableAccessor.getTableStartRowForMeta(tableName, MetaTableAccessor.QueryType.REGION)) + .withStopRow( + MetaTableAccessor.getTableStopRowForMeta(tableName, MetaTableAccessor.QueryType.REGION)); + Result result; + try (ResultScanner scanner = + MetaTableAccessor.getMetaHTable(UTIL.getConnection()).getScanner(barrierScan)) { + while ((result = scanner.next()) != null) { + assertTrue(MetaTableAccessor.getReplicationBarriers(result).length > 0); + } + } + boolean cleaned = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName); + assertTrue(cleaned); + for (RegionInfo regionInfo : regionInfos) { + assertEquals("check if there is lastPushedId", -1, + QUEUE_STORAGE.getLastSequenceId(regionInfo.getEncodedName(), PEER_1)); + assertEquals("check if there is lastPushedId", -1, + QUEUE_STORAGE.getLastSequenceId(regionInfo.getEncodedName(), PEER_2)); + } + cleaned = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName); + assertFalse(cleaned); + for (RegionInfo region : regionInfos) { + assertEquals(0, MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), + region.getRegionName()).length); + } + } + + @Test + public void testCleanReplicationBarrierWithExistTable() throws Exception { + TableName tableName = TableName.valueOf(TABLE_NAME); + String cf = COLUMN_FAMILY; + TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf)).build()) + .setReplicationScope(HConstants.REPLICATION_SCOPE_LOCAL).build(); + UTIL.createTable(tableDescriptor, Bytes.split(Bytes.toBytes(1), Bytes.toBytes(256), 123)); + assertTrue(UTIL.getAdmin().getRegions(tableName).size() > 0); + for (RegionInfo region : UTIL.getAdmin().getRegions(tableName)) { + addStateAndBarrier(region, RegionState.State.OFFLINE, 10, 100); + updatePushedSeqId(region, 10); + assertEquals("check if there is lastPushedId", 10, + QUEUE_STORAGE.getLastSequenceId(region.getEncodedName(), PEER_1)); + assertEquals("check if there is lastPushedId", 10, + QUEUE_STORAGE.getLastSequenceId(region.getEncodedName(), PEER_2)); + } + boolean cleaned = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName); + assertTrue(cleaned); + for (RegionInfo region : UTIL.getAdmin().getRegions(tableName)) { + assertEquals("check if there is lastPushedId", -1, + QUEUE_STORAGE.getLastSequenceId(region.getEncodedName(), PEER_1)); + assertEquals("check if there is lastPushedId", -1, + QUEUE_STORAGE.getLastSequenceId(region.getEncodedName(), PEER_2)); + } + cleaned = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName); + assertFalse(cleaned); + for (RegionInfo region : UTIL.getAdmin().getRegions(tableName)) { + assertEquals(0, MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), + region.getRegionName()).length); + } + } + + public static void createPeer() throws IOException { + ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder() + .setClusterKey(UTIL.getClusterKey()).setSerial(true).build(); + UTIL.getAdmin().addReplicationPeer(PEER_1, rpc); + UTIL.getAdmin().addReplicationPeer(PEER_2, rpc); + } + + private void addStateAndBarrier(RegionInfo region, RegionState.State state, long... barriers) + throws IOException { + Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime()); + if (state != null) { + put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER, + Bytes.toBytes(state.name())); + } + for (int i = 0; i < barriers.length; i++) { + put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER, + put.getTimeStamp() - barriers.length + i, Bytes.toBytes(barriers[i])); + } + try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { + table.put(put); + } + } + + private void updatePushedSeqId(RegionInfo region, long seqId) throws ReplicationException { + QUEUE_STORAGE.setWALPosition(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), + PEER_1, WAL_FILE_NAME, 10, ImmutableMap.of(region.getEncodedName(), seqId)); + QUEUE_STORAGE.setWALPosition(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), + PEER_2, WAL_FILE_NAME, 10, ImmutableMap.of(region.getEncodedName(), seqId)); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/87f5b5f3/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java index 99e4f08..1808b5e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.util.hbck; import static org.junit.Assert.assertEquals; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -40,14 +41,14 @@ public class HbckTestingUtil { public static HBaseFsck doFsck( Configuration conf, boolean fix, TableName table) throws Exception { - return doFsck(conf, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, table); + return doFsck(conf, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, table); } public static HBaseFsck doFsck(Configuration conf, boolean fixAssignments, boolean fixMeta, boolean fixHdfsHoles, boolean fixHdfsOverlaps, boolean fixHdfsOrphans, - boolean fixTableOrphans, boolean fixVersionFile, boolean fixReferenceFiles, boolean fixHFileLinks, - boolean fixEmptyMetaRegionInfo, boolean fixTableLocks, boolean fixReplication, - TableName table) throws Exception { + boolean fixTableOrphans, boolean fixVersionFile, boolean fixReferenceFiles, + boolean fixHFileLinks, boolean fixEmptyMetaRegionInfo, boolean fixTableLocks, + boolean fixReplication, boolean cleanReplicationBarrier, TableName table) throws Exception { HBaseFsck fsck = new HBaseFsck(conf, exec); try { HBaseFsck.setDisplayFullReport(); // i.e. -details @@ -63,6 +64,7 @@ public class HbckTestingUtil { fsck.setFixHFileLinks(fixHFileLinks); fsck.setFixEmptyMetaCells(fixEmptyMetaRegionInfo); fsck.setFixReplication(fixReplication); + fsck.setCleanReplicationBarrier(cleanReplicationBarrier); if (table != null) { fsck.includeTable(table); } @@ -88,6 +90,16 @@ public class HbckTestingUtil { return hbck; } + public static boolean cleanReplicationBarrier(Configuration conf, TableName table) + throws IOException, ClassNotFoundException { + HBaseFsck hbck = new HBaseFsck(conf, null); + hbck.setCleanReplicationBarrierTable(table.getNameAsString()); + hbck.setCleanReplicationBarrier(true); + hbck.connect(); + hbck.cleanReplicationBarrier(); + return hbck.shouldRerun(); + } + public static boolean inconsistencyFound(HBaseFsck fsck) throws Exception { List<ERROR_CODE> errs = fsck.getErrors().getErrorList(); return (errs != null && !errs.isEmpty());