HBASE-20378 Provide a hbck option to cleanup replication barrier for a table

Signed-off-by: zhangduo <zhang...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/87f5b5f3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/87f5b5f3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/87f5b5f3

Branch: refs/heads/HBASE-19064
Commit: 87f5b5f3411d96c31b4cb61b9a57ced22be91d1f
Parents: 6225b4a
Author: jingyuntian <tianjy1...@gmail.com>
Authored: Sat Apr 28 11:34:29 2018 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Fri May 4 15:27:33 2018 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/util/HBaseFsck.java | 131 ++++++++++--
 .../TestHBaseFsckCleanReplicationBarriers.java  | 205 +++++++++++++++++++
 .../hadoop/hbase/util/hbck/HbckTestingUtil.java |  20 +-
 3 files changed, 336 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/87f5b5f3/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
index 9fcf320..6d9ca9a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
@@ -58,6 +58,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -85,6 +86,7 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ClusterConnection;
@@ -99,7 +101,9 @@ import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
@@ -115,6 +119,10 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.security.AccessDeniedException;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
@@ -268,11 +276,13 @@ public class HBaseFsck extends Configured implements 
Closeable {
   private boolean fixHFileLinks = false; // fix lingering HFileLinks
   private boolean fixEmptyMetaCells = false; // fix (remove) empty 
REGIONINFO_QUALIFIER rows
   private boolean fixReplication = false; // fix undeleted replication queues 
for removed peer
+  private boolean cleanReplicationBarrier = false; // clean replication 
barriers of a table
   private boolean fixAny = false; // Set to true if any of the fix is required.
 
   // limit checking/fixes to listed tables, if empty attempt to check/fix all
   // hbase:meta are always checked
   private Set<TableName> tablesIncluded = new HashSet<>();
+  private TableName cleanReplicationBarrierTable;
   private int maxMerge = DEFAULT_MAX_MERGE; // maximum number of overlapping 
regions to merge
   // maximum number of overlapping regions to sideline
   private int maxOverlapsToSideline = DEFAULT_OVERLAPS_TO_SIDELINE;
@@ -786,6 +796,8 @@ public class HBaseFsck extends Configured implements 
Closeable {
 
     checkAndFixReplication();
 
+    cleanReplicationBarrier();
+
     // Remove the hbck znode
     cleanupHbckZnode();
 
@@ -4118,14 +4130,13 @@ public class HBaseFsck extends Configured implements 
Closeable {
     enum ERROR_CODE {
       UNKNOWN, NO_META_REGION, NULL_META_REGION, NO_VERSION_FILE, 
NOT_IN_META_HDFS, NOT_IN_META,
       NOT_IN_META_OR_DEPLOYED, NOT_IN_HDFS_OR_DEPLOYED, NOT_IN_HDFS, 
SERVER_DOES_NOT_MATCH_META,
-      NOT_DEPLOYED,
-      MULTI_DEPLOYED, SHOULD_NOT_BE_DEPLOYED, MULTI_META_REGION, 
RS_CONNECT_FAILURE,
+      NOT_DEPLOYED, MULTI_DEPLOYED, SHOULD_NOT_BE_DEPLOYED, MULTI_META_REGION, 
RS_CONNECT_FAILURE,
       FIRST_REGION_STARTKEY_NOT_EMPTY, LAST_REGION_ENDKEY_NOT_EMPTY, 
DUPE_STARTKEYS,
       HOLE_IN_REGION_CHAIN, OVERLAP_IN_REGION_CHAIN, REGION_CYCLE, 
DEGENERATE_REGION,
       ORPHAN_HDFS_REGION, LINGERING_SPLIT_PARENT, NO_TABLEINFO_FILE, 
LINGERING_REFERENCE_HFILE,
       LINGERING_HFILELINK, WRONG_USAGE, EMPTY_META_CELL, EXPIRED_TABLE_LOCK, 
BOUNDARIES_ERROR,
       ORPHAN_TABLE_STATE, NO_TABLE_STATE, UNDELETED_REPLICATION_QUEUE, 
DUPE_ENDKEYS,
-      UNSUPPORTED_OPTION
+      UNSUPPORTED_OPTION, INVALID_TABLE
     }
     void clear();
     void report(String message);
@@ -4557,6 +4568,10 @@ public class HBaseFsck extends Configured implements 
Closeable {
     fixAny |= shouldFix;
   }
 
+  public void setCleanReplicationBarrier(boolean shouldClean) {
+    cleanReplicationBarrier = shouldClean;
+  }
+
   /**
    * Check if we should rerun fsck again. This checks if we've tried to
    * fix something and we should rerun fsck tool again.
@@ -4567,7 +4582,7 @@ public class HBaseFsck extends Configured implements 
Closeable {
     rerun = true;
   }
 
-  boolean shouldRerun() {
+  public boolean shouldRerun() {
     return rerun;
   }
 
@@ -4848,7 +4863,11 @@ public class HBaseFsck extends Configured implements 
Closeable {
         "-fixHdfsOrphans -fixHdfsOverlaps -fixVersionFile -sidelineBigOverlaps 
-fixReferenceFiles" +
         "-fixHFileLinks");
     out.println("   -repairHoles      Shortcut for -fixAssignments -fixMeta 
-fixHdfsHoles");
-
+    out.println("");
+    out.println(" Replication options");
+    out.println("   -fixReplication   Deletes replication queues for removed 
peers");
+    out.println("   -cleanReplicationBrarier [tableName] clean the replication 
barriers " +
+        "of a specified table, tableName is required");
     out.flush();
     errors.reportError(ERROR_CODE.WRONG_USAGE, sw.toString());
 
@@ -4908,13 +4927,12 @@ public class HBaseFsck extends Configured implements 
Closeable {
           return printUsageAndExit();
         }
         try {
-          long timelag = Long.parseLong(args[i+1]);
+          long timelag = Long.parseLong(args[++i]);
           setTimeLag(timelag);
         } catch (NumberFormatException e) {
           errors.reportError(ERROR_CODE.WRONG_USAGE, "-timelag needs a numeric 
value.");
           return printUsageAndExit();
         }
-        i++;
       } else if (cmd.equals("-sleepBeforeRerun")) {
         if (i == args.length - 1) {
           errors.reportError(ERROR_CODE.WRONG_USAGE,
@@ -4922,19 +4940,17 @@ public class HBaseFsck extends Configured implements 
Closeable {
           return printUsageAndExit();
         }
         try {
-          sleepBeforeRerun = Long.parseLong(args[i+1]);
+          sleepBeforeRerun = Long.parseLong(args[++i]);
         } catch (NumberFormatException e) {
           errors.reportError(ERROR_CODE.WRONG_USAGE, "-sleepBeforeRerun needs 
a numeric value.");
           return printUsageAndExit();
         }
-        i++;
       } else if (cmd.equals("-sidelineDir")) {
         if (i == args.length - 1) {
           errors.reportError(ERROR_CODE.WRONG_USAGE, "HBaseFsck: -sidelineDir 
needs a value.");
           return printUsageAndExit();
         }
-        i++;
-        setSidelineDir(args[i]);
+        setSidelineDir(args[++i]);
       } else if (cmd.equals("-fix")) {
         errors.reportError(ERROR_CODE.WRONG_USAGE,
           "This option is deprecated, please use  -fixAssignments instead.");
@@ -5004,14 +5020,13 @@ public class HBaseFsck extends Configured implements 
Closeable {
           return printUsageAndExit();
         }
         try {
-          int maxOverlapsToSideline = Integer.parseInt(args[i+1]);
+          int maxOverlapsToSideline = Integer.parseInt(args[++i]);
           setMaxOverlapsToSideline(maxOverlapsToSideline);
         } catch (NumberFormatException e) {
           errors.reportError(ERROR_CODE.WRONG_USAGE,
             "-maxOverlapsToSideline needs a numeric value argument.");
           return printUsageAndExit();
         }
-        i++;
       } else if (cmd.equals("-maxMerge")) {
         if (i == args.length - 1) {
           errors.reportError(ERROR_CODE.WRONG_USAGE,
@@ -5019,14 +5034,13 @@ public class HBaseFsck extends Configured implements 
Closeable {
           return printUsageAndExit();
         }
         try {
-          int maxMerge = Integer.parseInt(args[i+1]);
+          int maxMerge = Integer.parseInt(args[++i]);
           setMaxMerge(maxMerge);
         } catch (NumberFormatException e) {
           errors.reportError(ERROR_CODE.WRONG_USAGE,
             "-maxMerge needs a numeric value argument.");
           return printUsageAndExit();
         }
-        i++;
       } else if (cmd.equals("-summary")) {
         setSummary();
       } else if (cmd.equals("-metaonly")) {
@@ -5035,6 +5049,12 @@ public class HBaseFsck extends Configured implements 
Closeable {
         setRegionBoundariesCheck();
       } else if (cmd.equals("-fixReplication")) {
         setFixReplication(true);
+      } else if (cmd.equals("-cleanReplicationBarrier")) {
+        setCleanReplicationBarrier(true);
+        if(args[++i].startsWith("-")){
+          printUsageAndExit();
+        }
+        setCleanReplicationBarrierTable(args[i]);
       } else if (cmd.startsWith("-")) {
         errors.reportError(ERROR_CODE.WRONG_USAGE, "Unrecognized option:" + 
cmd);
         return printUsageAndExit();
@@ -5120,7 +5140,7 @@ public class HBaseFsck extends Configured implements 
Closeable {
     boolean result = true;
     String hbaseServerVersion = status.getHBaseVersion();
     Object[] versionComponents = 
VersionInfo.getVersionComponents(hbaseServerVersion);
-    if (versionComponents[0] instanceof Integer && 
((Integer)versionComponents[0]) >= 2) {
+    if (versionComponents[0] instanceof Integer && ((Integer) 
versionComponents[0]) >= 2) {
       // Process command-line args.
       for (String arg : args) {
         if (unsupportedOptionsInV2.contains(arg)) {
@@ -5134,6 +5154,85 @@ public class HBaseFsck extends Configured implements 
Closeable {
     return result;
   }
 
+  public void setCleanReplicationBarrierTable(String 
cleanReplicationBarrierTable) {
+    this.cleanReplicationBarrierTable = 
TableName.valueOf(cleanReplicationBarrierTable);
+  }
+
+  public void cleanReplicationBarrier() throws IOException {
+    if (!cleanReplicationBarrier || cleanReplicationBarrierTable == null) {
+      return;
+    }
+    if (cleanReplicationBarrierTable.isSystemTable()) {
+      errors.reportError(ERROR_CODE.INVALID_TABLE,
+        "invalid table: " + cleanReplicationBarrierTable);
+      return;
+    }
+
+    boolean isGlobalScope = false;
+    try {
+      isGlobalScope = 
admin.getDescriptor(cleanReplicationBarrierTable).hasGlobalReplicationScope();
+    } catch (TableNotFoundException e) {
+      LOG.info("we may need to clean some erroneous data due to bugs");
+    }
+
+    if (isGlobalScope) {
+      errors.reportError(ERROR_CODE.INVALID_TABLE,
+        "table's replication scope is global: " + 
cleanReplicationBarrierTable);
+      return;
+    }
+    List<byte[]> regionNames = new ArrayList<>();
+    Scan barrierScan = new Scan();
+    barrierScan.setCaching(100);
+    barrierScan.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
+    barrierScan
+        
.withStartRow(MetaTableAccessor.getTableStartRowForMeta(cleanReplicationBarrierTable,
+          MetaTableAccessor.QueryType.REGION))
+        
.withStopRow(MetaTableAccessor.getTableStopRowForMeta(cleanReplicationBarrierTable,
+          MetaTableAccessor.QueryType.REGION));
+    Result result;
+    try (ResultScanner scanner = meta.getScanner(barrierScan)) {
+      while ((result = scanner.next()) != null) {
+        regionNames.add(result.getRow());
+      }
+    }
+    if (regionNames.size() <= 0) {
+      errors.reportError(ERROR_CODE.INVALID_TABLE,
+        "there is no barriers of this table: " + cleanReplicationBarrierTable);
+      return;
+    }
+    ReplicationQueueStorage queueStorage =
+        ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
+    List<ReplicationPeerDescription> peerDescriptions = 
admin.listReplicationPeers();
+    if (peerDescriptions != null && peerDescriptions.size() > 0) {
+      List<String> peers = peerDescriptions.stream()
+          .filter(peerConfig -> 
ReplicationUtils.contains(peerConfig.getPeerConfig(),
+            cleanReplicationBarrierTable))
+          .map(peerConfig -> 
peerConfig.getPeerId()).collect(Collectors.toList());
+      try {
+        List<String> batch = new ArrayList<>();
+        for (String peer : peers) {
+          for (byte[] regionName : regionNames) {
+            batch.add(RegionInfo.encodeRegionName(regionName));
+            if (batch.size() % 100 == 0) {
+              queueStorage.removeLastSequenceIds(peer, batch);
+              batch.clear();
+            }
+          }
+          if (batch.size() > 0) {
+            queueStorage.removeLastSequenceIds(peer, batch);
+            batch.clear();
+          }
+        }
+      } catch (ReplicationException re) {
+        throw new IOException(re);
+      }
+    }
+    for (byte[] regionName : regionNames) {
+      meta.delete(new 
Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY));
+    }
+    setShouldRerun();
+  }
+
   /**
    * ls -r for debugging purposes
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/87f5b5f3/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java
new file mode 100644
index 0000000..375f2ed
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
required by applicable
+ * law or agreed to in writing, software distributed under the License is 
distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied. See the License
+ * for the specific language governing permissions and limitations under the 
License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestHBaseFsckCleanReplicationBarriers {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestHBaseFsckCleanReplicationBarriers.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static String PEER_1 = "1", PEER_2 = "2";
+
+  private static ReplicationQueueStorage QUEUE_STORAGE;
+
+  private static String WAL_FILE_NAME = "test.wal";
+
+  private static String TABLE_NAME = "test";
+
+  private static String COLUMN_FAMILY = "info";
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    UTIL.startMiniCluster(1);
+    QUEUE_STORAGE = 
ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getZooKeeperWatcher(),
+      UTIL.getConfiguration());
+    createPeer();
+    
QUEUE_STORAGE.addWAL(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(),
 PEER_1,
+      WAL_FILE_NAME);
+    
QUEUE_STORAGE.addWAL(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(),
 PEER_2,
+      WAL_FILE_NAME);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testCleanReplicationBarrierWithNonExistTable()
+      throws ClassNotFoundException, IOException {
+    TableName tableName = TableName.valueOf(TABLE_NAME + "_non");
+    boolean cleaned = 
HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName);
+    assertFalse(cleaned);
+  }
+
+  @Test
+  public void testCleanReplicationBarrierWithDeletedTable() throws Exception {
+    TableName tableName = TableName.valueOf(TABLE_NAME + "_deleted");
+    List<RegionInfo> regionInfos = new ArrayList<>();
+    // only write some barriers into meta table
+
+    for (int i = 0; i < 110; i++) {
+      RegionInfo regionInfo = 
RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(i))
+          .setEndKey(Bytes.toBytes(i + 1)).build();
+      regionInfos.add(regionInfo);
+      addStateAndBarrier(regionInfo, RegionState.State.OPEN, 10, 100);
+      updatePushedSeqId(regionInfo, 10);
+      assertEquals("check if there is lastPushedId", 10,
+        QUEUE_STORAGE.getLastSequenceId(regionInfo.getEncodedName(), PEER_1));
+      assertEquals("check if there is lastPushedId", 10,
+        QUEUE_STORAGE.getLastSequenceId(regionInfo.getEncodedName(), PEER_2));
+    }
+    Scan barrierScan = new Scan();
+    barrierScan.setCaching(100);
+    barrierScan.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
+    barrierScan
+        .withStartRow(
+          MetaTableAccessor.getTableStartRowForMeta(tableName, 
MetaTableAccessor.QueryType.REGION))
+        .withStopRow(
+          MetaTableAccessor.getTableStopRowForMeta(tableName, 
MetaTableAccessor.QueryType.REGION));
+    Result result;
+    try (ResultScanner scanner =
+        
MetaTableAccessor.getMetaHTable(UTIL.getConnection()).getScanner(barrierScan)) {
+      while ((result = scanner.next()) != null) {
+        assertTrue(MetaTableAccessor.getReplicationBarriers(result).length > 
0);
+      }
+    }
+    boolean cleaned = 
HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName);
+    assertTrue(cleaned);
+    for (RegionInfo regionInfo : regionInfos) {
+      assertEquals("check if there is lastPushedId", -1,
+        QUEUE_STORAGE.getLastSequenceId(regionInfo.getEncodedName(), PEER_1));
+      assertEquals("check if there is lastPushedId", -1,
+        QUEUE_STORAGE.getLastSequenceId(regionInfo.getEncodedName(), PEER_2));
+    }
+    cleaned = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), 
tableName);
+    assertFalse(cleaned);
+    for (RegionInfo region : regionInfos) {
+      assertEquals(0, 
MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(),
+        region.getRegionName()).length);
+    }
+  }
+
+  @Test
+  public void testCleanReplicationBarrierWithExistTable() throws Exception {
+    TableName tableName = TableName.valueOf(TABLE_NAME);
+    String cf = COLUMN_FAMILY;
+    TableDescriptor tableDescriptor = 
TableDescriptorBuilder.newBuilder(tableName)
+        
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf)).build())
+        .setReplicationScope(HConstants.REPLICATION_SCOPE_LOCAL).build();
+    UTIL.createTable(tableDescriptor, Bytes.split(Bytes.toBytes(1), 
Bytes.toBytes(256), 123));
+    assertTrue(UTIL.getAdmin().getRegions(tableName).size() > 0);
+    for (RegionInfo region : UTIL.getAdmin().getRegions(tableName)) {
+      addStateAndBarrier(region, RegionState.State.OFFLINE, 10, 100);
+      updatePushedSeqId(region, 10);
+      assertEquals("check if there is lastPushedId", 10,
+        QUEUE_STORAGE.getLastSequenceId(region.getEncodedName(), PEER_1));
+      assertEquals("check if there is lastPushedId", 10,
+        QUEUE_STORAGE.getLastSequenceId(region.getEncodedName(), PEER_2));
+    }
+    boolean cleaned = 
HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName);
+    assertTrue(cleaned);
+    for (RegionInfo region : UTIL.getAdmin().getRegions(tableName)) {
+      assertEquals("check if there is lastPushedId", -1,
+        QUEUE_STORAGE.getLastSequenceId(region.getEncodedName(), PEER_1));
+      assertEquals("check if there is lastPushedId", -1,
+        QUEUE_STORAGE.getLastSequenceId(region.getEncodedName(), PEER_2));
+    }
+    cleaned = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), 
tableName);
+    assertFalse(cleaned);
+    for (RegionInfo region : UTIL.getAdmin().getRegions(tableName)) {
+      assertEquals(0, 
MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(),
+        region.getRegionName()).length);
+    }
+  }
+
+  public static void createPeer() throws IOException {
+    ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
+        .setClusterKey(UTIL.getClusterKey()).setSerial(true).build();
+    UTIL.getAdmin().addReplicationPeer(PEER_1, rpc);
+    UTIL.getAdmin().addReplicationPeer(PEER_2, rpc);
+  }
+
+  private void addStateAndBarrier(RegionInfo region, RegionState.State state, 
long... barriers)
+      throws IOException {
+    Put put = new Put(region.getRegionName(), 
EnvironmentEdgeManager.currentTime());
+    if (state != null) {
+      put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
+        Bytes.toBytes(state.name()));
+    }
+    for (int i = 0; i < barriers.length; i++) {
+      put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, 
HConstants.SEQNUM_QUALIFIER,
+        put.getTimeStamp() - barriers.length + i, Bytes.toBytes(barriers[i]));
+    }
+    try (Table table = 
UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
+      table.put(put);
+    }
+  }
+
+  private void updatePushedSeqId(RegionInfo region, long seqId) throws 
ReplicationException {
+    
QUEUE_STORAGE.setWALPosition(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(),
+      PEER_1, WAL_FILE_NAME, 10, ImmutableMap.of(region.getEncodedName(), 
seqId));
+    
QUEUE_STORAGE.setWALPosition(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(),
+      PEER_2, WAL_FILE_NAME, 10, ImmutableMap.of(region.getEncodedName(), 
seqId));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/87f5b5f3/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java
index 99e4f08..1808b5e 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.util.hbck;
 
 import static org.junit.Assert.assertEquals;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -40,14 +41,14 @@ public class HbckTestingUtil {
 
   public static HBaseFsck doFsck(
       Configuration conf, boolean fix, TableName table) throws Exception {
-    return doFsck(conf, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, 
fix, table);
+    return doFsck(conf, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, 
fix, fix, table);
   }
 
   public static HBaseFsck doFsck(Configuration conf, boolean fixAssignments, 
boolean fixMeta,
       boolean fixHdfsHoles, boolean fixHdfsOverlaps, boolean fixHdfsOrphans,
-      boolean fixTableOrphans, boolean fixVersionFile, boolean 
fixReferenceFiles, boolean fixHFileLinks,
-      boolean fixEmptyMetaRegionInfo, boolean fixTableLocks, boolean 
fixReplication,
-      TableName table) throws Exception {
+      boolean fixTableOrphans, boolean fixVersionFile, boolean 
fixReferenceFiles,
+      boolean fixHFileLinks, boolean fixEmptyMetaRegionInfo, boolean 
fixTableLocks,
+      boolean fixReplication, boolean cleanReplicationBarrier, TableName 
table) throws Exception {
     HBaseFsck fsck = new HBaseFsck(conf, exec);
     try {
       HBaseFsck.setDisplayFullReport(); // i.e. -details
@@ -63,6 +64,7 @@ public class HbckTestingUtil {
       fsck.setFixHFileLinks(fixHFileLinks);
       fsck.setFixEmptyMetaCells(fixEmptyMetaRegionInfo);
       fsck.setFixReplication(fixReplication);
+      fsck.setCleanReplicationBarrier(cleanReplicationBarrier);
       if (table != null) {
         fsck.includeTable(table);
       }
@@ -88,6 +90,16 @@ public class HbckTestingUtil {
     return hbck;
   }
 
+  public static boolean cleanReplicationBarrier(Configuration conf, TableName 
table)
+      throws IOException, ClassNotFoundException {
+    HBaseFsck hbck = new HBaseFsck(conf, null);
+    hbck.setCleanReplicationBarrierTable(table.getNameAsString());
+    hbck.setCleanReplicationBarrier(true);
+    hbck.connect();
+    hbck.cleanReplicationBarrier();
+    return hbck.shouldRerun();
+  }
+
   public static boolean inconsistencyFound(HBaseFsck fsck) throws Exception {
     List<ERROR_CODE> errs = fsck.getErrors().getErrorList();
     return (errs != null && !errs.isEmpty());

Reply via email to