Repository: hbase
Updated Branches:
  refs/heads/branch-2 39c1ddc6e -> 74ab10c35


HBASE-20296 Remove last pushed sequence ids when removing tables from a peer


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/74ab10c3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/74ab10c3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/74ab10c3

Branch: refs/heads/branch-2
Commit: 74ab10c353968cfcfe9b7ab07101bd3bfab74044
Parents: ead569c
Author: zhangduo <zhang...@apache.org>
Authored: Sat Mar 31 20:25:13 2018 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Mon Apr 9 15:18:44 2018 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/MetaTableAccessor.java  |  72 +++++-----
 .../replication/ReplicationQueueStorage.java    |   9 ++
 .../replication/ZKReplicationQueueStorage.java  |  15 +++
 .../master/replication/AddPeerProcedure.java    |  14 +-
 .../master/replication/ModifyPeerProcedure.java | 134 ++++++++++---------
 .../replication/UpdatePeerConfigProcedure.java  |  96 ++++++++++++-
 .../hadoop/hbase/client/TestEnableTable.java    |   4 +-
 .../TestRemoveFromSerialReplicationPeer.java    | 120 +++++++++++++++++
 8 files changed, 363 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/74ab10c3/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index 4cc46c8..0f5ef09 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -57,6 +57,8 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.master.RegionState.State;
@@ -682,20 +684,19 @@ public class MetaTableAccessor {
     scanMeta(connection, null, null, QueryType.ALL, v);
   }
 
-  public static void scanMetaForTableRegions(Connection connection,
-      Visitor visitor, TableName tableName) throws IOException {
+  public static void scanMetaForTableRegions(Connection connection, Visitor 
visitor,
+      TableName tableName) throws IOException {
     scanMeta(connection, tableName, QueryType.REGION, Integer.MAX_VALUE, 
visitor);
   }
 
-  public static void scanMeta(Connection connection, TableName table,
-      QueryType type, int maxRows, final Visitor visitor) throws IOException {
+  public static void scanMeta(Connection connection, TableName table, 
QueryType type, int maxRows,
+      final Visitor visitor) throws IOException {
     scanMeta(connection, getTableStartRowForMeta(table, type), 
getTableStopRowForMeta(table, type),
-        type, maxRows, visitor);
+      type, maxRows, visitor);
   }
 
-  public static void scanMeta(Connection connection,
-      @Nullable final byte[] startRow, @Nullable final byte[] stopRow,
-      QueryType type, final Visitor visitor) throws IOException {
+  public static void scanMeta(Connection connection, @Nullable final byte[] 
startRow,
+      @Nullable final byte[] stopRow, QueryType type, final Visitor visitor) 
throws IOException {
     scanMeta(connection, startRow, stopRow, type, Integer.MAX_VALUE, visitor);
   }
 
@@ -708,26 +709,19 @@ public class MetaTableAccessor {
    * @param tableName  table withing we scan
    * @param row        start scan from this row
    * @param rowLimit   max number of rows to return
-   * @throws IOException
    */
-  public static void scanMeta(Connection connection,
-      final Visitor visitor, final TableName tableName,
-      final byte[] row, final int rowLimit)
-      throws IOException {
-
+  public static void scanMeta(Connection connection, final Visitor visitor,
+      final TableName tableName, final byte[] row, final int rowLimit) throws 
IOException {
     byte[] startRow = null;
     byte[] stopRow = null;
     if (tableName != null) {
-      startRow =
-          getTableStartRowForMeta(tableName, QueryType.REGION);
+      startRow = getTableStartRowForMeta(tableName, QueryType.REGION);
       if (row != null) {
-        RegionInfo closestRi =
-            getClosestRegionInfo(connection, tableName, row);
-        startRow = RegionInfo
-            .createRegionName(tableName, closestRi.getStartKey(), 
HConstants.ZEROES, false);
+        RegionInfo closestRi = getClosestRegionInfo(connection, tableName, 
row);
+        startRow =
+          RegionInfo.createRegionName(tableName, closestRi.getStartKey(), 
HConstants.ZEROES, false);
       }
-      stopRow =
-          getTableStopRowForMeta(tableName, QueryType.REGION);
+      stopRow = getTableStopRowForMeta(tableName, QueryType.REGION);
     }
     scanMeta(connection, startRow, stopRow, QueryType.REGION, rowLimit, 
visitor);
   }
@@ -743,11 +737,16 @@ public class MetaTableAccessor {
    * @param type scanned part of meta
    * @param maxRows maximum rows to return
    * @param visitor Visitor invoked against each row.
-   * @throws IOException
    */
   public static void scanMeta(Connection connection, @Nullable final byte[] 
startRow,
       @Nullable final byte[] stopRow, QueryType type, int maxRows, final 
Visitor visitor)
       throws IOException {
+    scanMeta(connection, startRow, stopRow, type, null, maxRows, visitor);
+  }
+
+  private static void scanMeta(Connection connection, @Nullable final byte[] 
startRow,
+      @Nullable final byte[] stopRow, QueryType type, @Nullable Filter filter, 
int maxRows,
+      final Visitor visitor) throws IOException {
     int rowUpperLimit = maxRows > 0 ? maxRows : Integer.MAX_VALUE;
     Scan scan = getMetaScan(connection, rowUpperLimit);
 
@@ -760,13 +759,14 @@ public class MetaTableAccessor {
     if (stopRow != null) {
       scan.withStopRow(stopRow);
     }
+    if (filter != null) {
+      scan.setFilter(filter);
+    }
 
     if (LOG.isTraceEnabled()) {
-      LOG.trace("Scanning META"
-          + " starting at row=" + Bytes.toStringBinary(startRow)
-          + " stopping at row=" + Bytes.toStringBinary(stopRow)
-          + " for max=" + rowUpperLimit
-          + " with caching=" + scan.getCaching());
+      LOG.trace("Scanning META" + " starting at row=" + 
Bytes.toStringBinary(startRow) +
+        " stopping at row=" + Bytes.toStringBinary(stopRow) + " for max=" + 
rowUpperLimit +
+        " with caching=" + scan.getCaching());
     }
 
     int currentRow = 0;
@@ -1973,7 +1973,7 @@ public class MetaTableAccessor {
     byte[] value = getParentsBytes(parents);
     
put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow())
       
.setFamily(HConstants.REPLICATION_BARRIER_FAMILY).setQualifier(REPLICATION_PARENT_QUALIFIER)
-      
.setTimestamp(put.getTimeStamp()).setType(Type.Put).setValue(value).build());
+      
.setTimestamp(put.getTimestamp()).setType(Type.Put).setValue(value).build());
   }
 
   private static Put makePutForReplicationBarrier(RegionInfo regionInfo, long 
openSeqNum, long ts)
@@ -1988,7 +1988,7 @@ public class MetaTableAccessor {
       .setRow(put.getRow())
       .setFamily(HConstants.REPLICATION_BARRIER_FAMILY)
       .setQualifier(HConstants.SEQNUM_QUALIFIER)
-      .setTimestamp(put.getTimeStamp())
+      .setTimestamp(put.getTimestamp())
       .setType(Type.Put)
       .setValue(Bytes.toBytes(openSeqNum))
       .build());
@@ -2128,6 +2128,18 @@ public class MetaTableAccessor {
     return list;
   }
 
+  public static List<String> 
getTableEncodedRegionNamesForSerialReplication(Connection conn,
+      TableName tableName) throws IOException {
+    List<String> list = new ArrayList<>();
+    scanMeta(conn, getTableStartRowForMeta(tableName, QueryType.REPLICATION),
+      getTableStopRowForMeta(tableName, QueryType.REPLICATION), 
QueryType.REPLICATION,
+      new FirstKeyOnlyFilter(), Integer.MAX_VALUE, r -> {
+        list.add(RegionInfo.encodeRegionName(r.getRow()));
+        return true;
+      });
+    return list;
+  }
+
   private static void debugLogMutations(List<? extends Mutation> mutations) 
throws IOException {
     if (!METALOG.isDebugEnabled()) {
       return;

http://git-wip-us.apache.org/repos/asf/hbase/blob/74ab10c3/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
index cd37ac2..84653ad 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
@@ -91,6 +91,15 @@ public interface ReplicationQueueStorage {
    * @param peerId peer id
    */
   void removeLastSequenceIds(String peerId) throws ReplicationException;
+
+  /**
+   * Remove the max sequence id record for the given peer and regions.
+   * @param peerId peer id
+   * @param encodedRegionNames the encoded region names
+   */
+  void removeLastSequenceIds(String peerId, List<String> encodedRegionNames)
+      throws ReplicationException;
+
   /**
    * Get the current position for a specific WAL in a given queue for a given 
regionserver.
    * @param serverName the name of the regionserver

http://git-wip-us.apache.org/repos/asf/hbase/blob/74ab10c3/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
index 96b0b91..6d72128 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
@@ -28,6 +28,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.stream.Collectors;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -347,6 +348,20 @@ class ZKReplicationQueueStorage extends 
ZKReplicationStorageBase
   }
 
   @Override
+  public void removeLastSequenceIds(String peerId, List<String> 
encodedRegionNames)
+      throws ReplicationException {
+    try {
+      List<ZKUtilOp> listOfOps =
+        encodedRegionNames.stream().map(n -> 
getSerialReplicationRegionPeerNode(n, peerId))
+          .map(ZKUtilOp::deleteNodeFailSilent).collect(Collectors.toList());
+      ZKUtil.multiOrSequential(zookeeper, listOfOps, true);
+    } catch (KeeperException e) {
+      throw new ReplicationException("Failed to remove last sequence ids, 
peerId=" + peerId +
+        ", encodedRegionNames.size=" + encodedRegionNames.size(), e);
+    }
+  }
+
+  @Override
   public long getWALPosition(ServerName serverName, String queueId, String 
fileName)
       throws ReplicationException {
     byte[] bytes;

http://git-wip-us.apache.org/repos/asf/hbase/blob/74ab10c3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
index 72228f6..2f2d5a5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.AddPeerStateData;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
 
 /**
  * The procedure for adding a new replication peer.
@@ -57,8 +58,15 @@ public class AddPeerProcedure extends ModifyPeerProcedure {
   }
 
   @Override
-  protected boolean reopenRegionsAfterRefresh() {
-    return true;
+  protected PeerModificationState nextStateAfterRefresh() {
+    return peerConfig.isSerial() ? 
PeerModificationState.SERIAL_PEER_REOPEN_REGIONS
+      : super.nextStateAfterRefresh();
+  }
+
+  @Override
+  protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv 
env)
+      throws IOException, ReplicationException {
+    setLastPushedSequenceId(env, peerConfig);
   }
 
   @Override
@@ -102,7 +110,7 @@ public class AddPeerProcedure extends ModifyPeerProcedure {
   protected void serializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
     super.serializeStateData(serializer);
     serializer.serialize(AddPeerStateData.newBuilder()
-        
.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)).setEnabled(enabled).build());
+      
.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)).setEnabled(enabled).build());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/74ab10c3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
index 2b76487..8bedeff 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
@@ -18,11 +18,10 @@
 package org.apache.hadoop.hbase.master.replication;
 
 import java.io.IOException;
-import java.io.UncheckedIOException;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.stream.Stream;
 import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.TableDescriptor;
@@ -55,7 +54,7 @@ public abstract class ModifyPeerProcedure extends 
AbstractPeerProcedure<PeerModi
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ModifyPeerProcedure.class);
 
-  private static final int SET_LAST_SEQ_ID_BATCH_SIZE = 1000;
+  protected static final int UPDATE_LAST_SEQ_ID_BATCH_SIZE = 1000;
 
   protected ModifyPeerProcedure() {
   }
@@ -93,12 +92,11 @@ public abstract class ModifyPeerProcedure extends 
AbstractPeerProcedure<PeerModi
   }
 
   /**
-   * Implementation class can override this method. The default return value 
is false which means we
-   * will jump to POST_PEER_MODIFICATION and finish the procedure. If returns 
true, we will jump to
-   * SERIAL_PEER_REOPEN_REGIONS.
+   * Implementation class can override this method. By default we will jump to
+   * POST_PEER_MODIFICATION and finish the procedure.
    */
-  protected boolean reopenRegionsAfterRefresh() {
-    return false;
+  protected PeerModificationState nextStateAfterRefresh() {
+    return PeerModificationState.POST_PEER_MODIFICATION;
   }
 
   /**
@@ -123,80 +121,97 @@ public abstract class ModifyPeerProcedure extends 
AbstractPeerProcedure<PeerModi
     throw new UnsupportedOperationException();
   }
 
-  private Stream<TableDescriptor> getTables(MasterProcedureEnv env) throws 
IOException {
-    ReplicationPeerConfig peerConfig = getNewPeerConfig();
-    Stream<TableDescriptor> stream = 
env.getMasterServices().getTableDescriptors().getAll().values()
-      .stream().filter(TableDescriptor::hasGlobalReplicationScope)
-      .filter(td -> ReplicationUtils.contains(peerConfig, td.getTableName()));
-    ReplicationPeerConfig oldPeerConfig = getOldPeerConfig();
-    if (oldPeerConfig != null && oldPeerConfig.isSerial()) {
-      stream = stream.filter(td -> !ReplicationUtils.contains(oldPeerConfig, 
td.getTableName()));
-    }
-    return stream;
+  protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv 
env)
+      throws IOException, ReplicationException {
+    throw new UnsupportedOperationException();
   }
 
   private void reopenRegions(MasterProcedureEnv env) throws IOException {
-    Stream<TableDescriptor> stream = getTables(env);
+    ReplicationPeerConfig peerConfig = getNewPeerConfig();
+    ReplicationPeerConfig oldPeerConfig = getOldPeerConfig();
     TableStateManager tsm = env.getMasterServices().getTableStateManager();
-    stream.filter(td -> {
-      try {
-        return tsm.getTableState(td.getTableName()).isEnabled();
-      } catch (TableStateNotFoundException e) {
-        return false;
-      } catch (IOException e) {
-        throw new UncheckedIOException(e);
+    for (TableDescriptor td : 
env.getMasterServices().getTableDescriptors().getAll().values()) {
+      if (!td.hasGlobalReplicationScope()) {
+        continue;
+      }
+      TableName tn = td.getTableName();
+      if (!ReplicationUtils.contains(peerConfig, tn)) {
+        continue;
+      }
+      if (oldPeerConfig != null && oldPeerConfig.isSerial() &&
+        ReplicationUtils.contains(oldPeerConfig, tn)) {
+        continue;
       }
-    }).forEach(td -> {
       try {
-        addChildProcedure(env.getAssignmentManager().createReopenProcedures(
-          
env.getAssignmentManager().getRegionStates().getRegionsOfTable(td.getTableName())));
-      } catch (IOException e) {
-        throw new UncheckedIOException(e);
+        if (!tsm.getTableState(tn).isEnabled()) {
+          continue;
+        }
+      } catch (TableStateNotFoundException e) {
+        continue;
       }
-    });
+      addChildProcedure(env.getAssignmentManager().createReopenProcedures(
+        env.getAssignmentManager().getRegionStates().getRegionsOfTable(tn)));
+    }
   }
 
   private void addToMap(Map<String, Long> lastSeqIds, String 
encodedRegionName, long barrier,
       ReplicationQueueStorage queueStorage) throws ReplicationException {
     if (barrier >= 0) {
       lastSeqIds.put(encodedRegionName, barrier);
-      if (lastSeqIds.size() >= SET_LAST_SEQ_ID_BATCH_SIZE) {
+      if (lastSeqIds.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) {
         queueStorage.setLastSequenceIds(peerId, lastSeqIds);
         lastSeqIds.clear();
       }
     }
   }
 
-  private void setLastSequenceIdForSerialPeer(MasterProcedureEnv env)
-      throws IOException, ReplicationException {
-    Stream<TableDescriptor> stream = getTables(env);
+  protected final void setLastPushedSequenceId(MasterProcedureEnv env,
+      ReplicationPeerConfig peerConfig) throws IOException, 
ReplicationException {
+    Map<String, Long> lastSeqIds = new HashMap<String, Long>();
+    for (TableDescriptor td : 
env.getMasterServices().getTableDescriptors().getAll().values()) {
+      if (!td.hasGlobalReplicationScope()) {
+        continue;
+      }
+      TableName tn = td.getTableName();
+      if (!ReplicationUtils.contains(peerConfig, tn)) {
+        continue;
+      }
+      setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
+    }
+    if (!lastSeqIds.isEmpty()) {
+      
env.getReplicationPeerManager().getQueueStorage().setLastSequenceIds(peerId, 
lastSeqIds);
+    }
+  }
+
+  // Will put the encodedRegionName->lastPushedSeqId pair into the map passed 
in, if the map is
+  // large enough we will call queueStorage.setLastSequenceIds and clear the 
map. So the caller
+  // should not forget to check whether the map is empty at last, if not you 
should call
+  // queueStorage.setLastSequenceIds to write out the remaining entries in the 
map.
+  protected final void setLastPushedSequenceIdForTable(MasterProcedureEnv env, 
TableName tableName,
+      Map<String, Long> lastSeqIds) throws IOException, ReplicationException {
     TableStateManager tsm = env.getMasterServices().getTableStateManager();
     ReplicationQueueStorage queueStorage = 
env.getReplicationPeerManager().getQueueStorage();
     Connection conn = env.getMasterServices().getConnection();
     RegionStates regionStates = env.getAssignmentManager().getRegionStates();
     MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
-    Map<String, Long> lastSeqIds = new HashMap<String, Long>();
-    stream.forEach(td -> {
-      try {
-        if (tsm.getTableState(td.getTableName()).isEnabled()) {
-          for (Pair<String, Long> name2Barrier : MetaTableAccessor
-            .getTableEncodedRegionNameAndLastBarrier(conn, td.getTableName())) 
{
-            addToMap(lastSeqIds, name2Barrier.getFirst(), 
name2Barrier.getSecond().longValue() - 1,
-              queueStorage);
-          }
-        } else {
-          for (RegionInfo region : 
regionStates.getRegionsOfTable(td.getTableName(), true)) {
-            long maxSequenceId =
-              WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), 
mfs.getRegionDir(region));
-            addToMap(lastSeqIds, region.getEncodedName(), maxSequenceId, 
queueStorage);
-          }
-        }
-      } catch (IOException | ReplicationException e) {
-        throw new RuntimeException(e);
+    boolean isTableEnabled;
+    try {
+      isTableEnabled = tsm.getTableState(tableName).isEnabled();
+    } catch (TableStateNotFoundException e) {
+      return;
+    }
+    if (isTableEnabled) {
+      for (Pair<String, Long> name2Barrier : MetaTableAccessor
+        .getTableEncodedRegionNameAndLastBarrier(conn, tableName)) {
+        addToMap(lastSeqIds, name2Barrier.getFirst(), 
name2Barrier.getSecond().longValue() - 1,
+          queueStorage);
+      }
+    } else {
+      for (RegionInfo region : regionStates.getRegionsOfTable(tableName, 
true)) {
+        long maxSequenceId =
+          WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), 
mfs.getRegionDir(region));
+        addToMap(lastSeqIds, region.getEncodedName(), maxSequenceId, 
queueStorage);
       }
-    });
-    if (!lastSeqIds.isEmpty()) {
-      queueStorage.setLastSequenceIds(peerId, lastSeqIds);
     }
   }
 
@@ -232,8 +247,7 @@ public abstract class ModifyPeerProcedure extends 
AbstractPeerProcedure<PeerModi
         return Flow.HAS_MORE_STATE;
       case REFRESH_PEER_ON_RS:
         refreshPeer(env, getPeerOperationType());
-        setNextState(reopenRegionsAfterRefresh() ? 
PeerModificationState.SERIAL_PEER_REOPEN_REGIONS
-          : PeerModificationState.POST_PEER_MODIFICATION);
+        setNextState(nextStateAfterRefresh());
         return Flow.HAS_MORE_STATE;
       case SERIAL_PEER_REOPEN_REGIONS:
         try {
@@ -246,7 +260,7 @@ public abstract class ModifyPeerProcedure extends 
AbstractPeerProcedure<PeerModi
         return Flow.HAS_MORE_STATE;
       case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID:
         try {
-          setLastSequenceIdForSerialPeer(env);
+          updateLastPushedSequenceIdForSerialPeer(env);
         } catch (Exception e) {
           LOG.warn("{} set last sequence id for peer {} failed, retry", 
getClass().getName(),
             peerId, e);

http://git-wip-us.apache.org/repos/asf/hbase/blob/74ab10c3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
index ccfd4a0..39c8fa9 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
@@ -18,6 +18,14 @@
 package org.apache.hadoop.hbase.master.replication;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
@@ -25,11 +33,13 @@ import 
org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.UpdatePeerConfigStateData;
 
 /**
@@ -59,12 +69,84 @@ public class UpdatePeerConfigProcedure extends 
ModifyPeerProcedure {
     return PeerOperationType.UPDATE_CONFIG;
   }
 
+  private void addToList(List<String> encodedRegionNames, String 
encodedRegionName,
+      ReplicationQueueStorage queueStorage) throws ReplicationException {
+    encodedRegionNames.add(encodedRegionName);
+    if (encodedRegionNames.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) {
+      queueStorage.removeLastSequenceIds(peerId, encodedRegionNames);
+      encodedRegionNames.clear();
+    }
+  }
+
+  @Override
+  protected PeerModificationState nextStateAfterRefresh() {
+    if (peerConfig.isSerial()) {
+      if (oldPeerConfig.isSerial()) {
+        // both serial, then if the ns/table-cfs configs are not changed, just 
go with the normal
+        // way, otherwise we need to reopen the regions for the newly added 
tables.
+        return ReplicationUtils.isNamespacesAndTableCFsEqual(peerConfig, 
oldPeerConfig)
+          ? super.nextStateAfterRefresh()
+          : PeerModificationState.SERIAL_PEER_REOPEN_REGIONS;
+      } else {
+        // we change the peer to serial, need to reopen all regions
+        return PeerModificationState.SERIAL_PEER_REOPEN_REGIONS;
+      }
+    } else {
+      if (oldPeerConfig.isSerial()) {
+        // we remove the serial flag for peer, then we do not need to reopen 
all regions, but we
+        // need to remove the last pushed sequence ids.
+        return PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID;
+      } else {
+        // not serial for both, just go with the normal way.
+        return super.nextStateAfterRefresh();
+      }
+    }
+  }
+
   @Override
-  protected boolean reopenRegionsAfterRefresh() {
-    // If we remove some tables from the peer config then we do not need to 
enter the extra states
-    // for serial replication. Could try to optimize later since it is not 
easy to determine this...
-    return peerConfig.isSerial() && (!oldPeerConfig.isSerial() ||
-      !ReplicationUtils.isNamespacesAndTableCFsEqual(peerConfig, 
oldPeerConfig));
+  protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv 
env)
+      throws IOException, ReplicationException {
+    if (!oldPeerConfig.isSerial()) {
+      assert peerConfig.isSerial();
+      // change to serial
+      setLastPushedSequenceId(env, peerConfig);
+      return;
+    }
+    if (!peerConfig.isSerial()) {
+      // remove the serial flag
+      env.getReplicationPeerManager().removeAllLastPushedSeqIds(peerId);
+      return;
+    }
+    // enter here means peerConfig and oldPeerConfig are both serial, let's 
find out the diffs and
+    // process them
+    ReplicationQueueStorage queueStorage = 
env.getReplicationPeerManager().getQueueStorage();
+    Connection conn = env.getMasterServices().getConnection();
+    Map<String, Long> lastSeqIds = new HashMap<String, Long>();
+    List<String> encodedRegionNames = new ArrayList<>();
+    for (TableDescriptor td : 
env.getMasterServices().getTableDescriptors().getAll().values()) {
+      if (!td.hasGlobalReplicationScope()) {
+        continue;
+      }
+      TableName tn = td.getTableName();
+      if (ReplicationUtils.contains(oldPeerConfig, tn)) {
+        if (!ReplicationUtils.contains(peerConfig, tn)) {
+          // removed from peer config
+          for (String encodedRegionName : MetaTableAccessor
+            .getTableEncodedRegionNamesForSerialReplication(conn, tn)) {
+            addToList(encodedRegionNames, encodedRegionName, queueStorage);
+          }
+        }
+      } else if (ReplicationUtils.contains(peerConfig, tn)) {
+        // newly added to peer config
+        setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
+      }
+    }
+    if (!encodedRegionNames.isEmpty()) {
+      queueStorage.removeLastSequenceIds(peerId, encodedRegionNames);
+    }
+    if (!lastSeqIds.isEmpty()) {
+      queueStorage.setLastSequenceIds(peerId, lastSeqIds);
+    }
   }
 
   @Override
@@ -99,7 +181,9 @@ public class UpdatePeerConfigProcedure extends 
ModifyPeerProcedure {
   @Override
   protected void updatePeerStorage(MasterProcedureEnv env) throws 
ReplicationException {
     env.getReplicationPeerManager().updatePeerConfig(peerId, peerConfig);
-    if (enabled && reopenRegionsAfterRefresh()) {
+    // if we need to jump to the special states for serial peers, then we need 
to disable the peer
+    // first if it is not disabled yet.
+    if (enabled && nextStateAfterRefresh() != super.nextStateAfterRefresh()) {
       env.getReplicationPeerManager().disablePeer(peerId);
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/74ab10c3/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java
index 3b807aa..7a1bc55 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java
@@ -186,8 +186,8 @@ public class TestEnableTable {
         fail("Got an exception while deleting " + tableName);
       }
       int rowCount = 0;
-      try (ResultScanner scanner =
-          
metaTable.getScanner(MetaTableAccessor.getScanForTableName(TEST_UTIL.getConnection(),
 tableName))) {
+      try (ResultScanner scanner = metaTable
+        
.getScanner(MetaTableAccessor.getScanForTableName(TEST_UTIL.getConnection(), 
tableName))) {
         for (Result result : scanner) {
           LOG.info("Found when none expected: " + result);
           rowCount++;

http://git-wip-us.apache.org/repos/asf/hbase/blob/74ab10c3/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestRemoveFromSerialReplicationPeer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestRemoveFromSerialReplicationPeer.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestRemoveFromSerialReplicationPeer.java
new file mode 100644
index 0000000..eda15d8
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestRemoveFromSerialReplicationPeer.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Collections;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import 
org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
+/**
+ * Testcase for HBASE-20296.
+ */
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestRemoveFromSerialReplicationPeer extends 
SerialReplicationTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRemoveFromSerialReplicationPeer.class);
+
+  @Before
+  public void setUp() throws IOException, StreamLacksCapabilityException {
+    setupWALWriter();
+  }
+
+  private void waitUntilHasLastPushedSequenceId(RegionInfo region) throws 
Exception {
+    ReplicationQueueStorage queueStorage =
+      
UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
+    UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+      @Override
+      public boolean evaluate() throws Exception {
+        return queueStorage.getLastSequenceId(region.getEncodedName(), 
PEER_ID) > 0;
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        return "Still no last pushed sequence id for " + region;
+      }
+    });
+  }
+
+  @Test
+  public void testRemoveTable() throws Exception {
+    TableName tableName = createTable();
+    ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
+      .setClusterKey("127.0.0.1:2181:/hbase")
+      .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName())
+      .setReplicateAllUserTables(false)
+      .setTableCFsMap(ImmutableMap.of(tableName, 
Collections.emptyList())).setSerial(true).build();
+    UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true);
+    try (Table table = UTIL.getConnection().getTable(tableName)) {
+      for (int i = 0; i < 100; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
+      }
+    }
+    RegionInfo region = 
UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo();
+    waitUntilHasLastPushedSequenceId(region);
+
+    UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID,
+      
ReplicationPeerConfig.newBuilder(peerConfig).setTableCFsMap(Collections.emptyMap()).build());
+
+    ReplicationQueueStorage queueStorage =
+      
UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
+    assertEquals(HConstants.NO_SEQNUM,
+      queueStorage.getLastSequenceId(region.getEncodedName(), PEER_ID));
+  }
+
+  @Test
+  public void testRemoveSerialFlag() throws Exception {
+    TableName tableName = createTable();
+    addPeer(true);
+    try (Table table = UTIL.getConnection().getTable(tableName)) {
+      for (int i = 0; i < 100; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
+      }
+    }
+    RegionInfo region = 
UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo();
+    waitUntilHasLastPushedSequenceId(region);
+    UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID, ReplicationPeerConfig
+      
.newBuilder(UTIL.getAdmin().getReplicationPeerConfig(PEER_ID)).setSerial(false).build());
+    waitUntilReplicationDone(100);
+
+    ReplicationQueueStorage queueStorage =
+      
UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
+    assertEquals(HConstants.NO_SEQNUM,
+      queueStorage.getLastSequenceId(region.getEncodedName(), PEER_ID));
+  }
+}

Reply via email to