HBASE-20296 Remove last pushed sequence ids when removing tables from a peer
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/74ab10c3 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/74ab10c3 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/74ab10c3 Branch: refs/heads/HBASE-20046-branch-2 Commit: 74ab10c353968cfcfe9b7ab07101bd3bfab74044 Parents: ead569c Author: zhangduo <zhang...@apache.org> Authored: Sat Mar 31 20:25:13 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Mon Apr 9 15:18:44 2018 +0800 ---------------------------------------------------------------------- .../apache/hadoop/hbase/MetaTableAccessor.java | 72 +++++----- .../replication/ReplicationQueueStorage.java | 9 ++ .../replication/ZKReplicationQueueStorage.java | 15 +++ .../master/replication/AddPeerProcedure.java | 14 +- .../master/replication/ModifyPeerProcedure.java | 134 ++++++++++--------- .../replication/UpdatePeerConfigProcedure.java | 96 ++++++++++++- .../hadoop/hbase/client/TestEnableTable.java | 4 +- .../TestRemoveFromSerialReplicationPeer.java | 120 +++++++++++++++++ 8 files changed, 363 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/74ab10c3/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java index 4cc46c8..0f5ef09 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java @@ -57,6 +57,8 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.RegionState.State; @@ -682,20 +684,19 @@ public class MetaTableAccessor { scanMeta(connection, null, null, QueryType.ALL, v); } - public static void scanMetaForTableRegions(Connection connection, - Visitor visitor, TableName tableName) throws IOException { + public static void scanMetaForTableRegions(Connection connection, Visitor visitor, + TableName tableName) throws IOException { scanMeta(connection, tableName, QueryType.REGION, Integer.MAX_VALUE, visitor); } - public static void scanMeta(Connection connection, TableName table, - QueryType type, int maxRows, final Visitor visitor) throws IOException { + public static void scanMeta(Connection connection, TableName table, QueryType type, int maxRows, + final Visitor visitor) throws IOException { scanMeta(connection, getTableStartRowForMeta(table, type), getTableStopRowForMeta(table, type), - type, maxRows, visitor); + type, maxRows, visitor); } - public static void scanMeta(Connection connection, - @Nullable final byte[] startRow, @Nullable final byte[] stopRow, - QueryType type, final Visitor visitor) throws IOException { + public static void scanMeta(Connection connection, @Nullable final byte[] startRow, + @Nullable final byte[] stopRow, QueryType type, final Visitor visitor) throws IOException { scanMeta(connection, startRow, stopRow, type, Integer.MAX_VALUE, visitor); } @@ -708,26 +709,19 @@ public class MetaTableAccessor { * @param tableName table withing we scan * @param row start scan from this row * @param rowLimit max number of rows to return - * @throws IOException */ - public static void scanMeta(Connection connection, - final Visitor visitor, final TableName tableName, - final byte[] row, final int rowLimit) - throws IOException { - + public static void scanMeta(Connection connection, final Visitor visitor, + final TableName tableName, final byte[] row, final int rowLimit) throws IOException { byte[] startRow = null; byte[] stopRow = null; if (tableName != null) { - startRow = - getTableStartRowForMeta(tableName, QueryType.REGION); + startRow = getTableStartRowForMeta(tableName, QueryType.REGION); if (row != null) { - RegionInfo closestRi = - getClosestRegionInfo(connection, tableName, row); - startRow = RegionInfo - .createRegionName(tableName, closestRi.getStartKey(), HConstants.ZEROES, false); + RegionInfo closestRi = getClosestRegionInfo(connection, tableName, row); + startRow = + RegionInfo.createRegionName(tableName, closestRi.getStartKey(), HConstants.ZEROES, false); } - stopRow = - getTableStopRowForMeta(tableName, QueryType.REGION); + stopRow = getTableStopRowForMeta(tableName, QueryType.REGION); } scanMeta(connection, startRow, stopRow, QueryType.REGION, rowLimit, visitor); } @@ -743,11 +737,16 @@ public class MetaTableAccessor { * @param type scanned part of meta * @param maxRows maximum rows to return * @param visitor Visitor invoked against each row. - * @throws IOException */ public static void scanMeta(Connection connection, @Nullable final byte[] startRow, @Nullable final byte[] stopRow, QueryType type, int maxRows, final Visitor visitor) throws IOException { + scanMeta(connection, startRow, stopRow, type, null, maxRows, visitor); + } + + private static void scanMeta(Connection connection, @Nullable final byte[] startRow, + @Nullable final byte[] stopRow, QueryType type, @Nullable Filter filter, int maxRows, + final Visitor visitor) throws IOException { int rowUpperLimit = maxRows > 0 ? maxRows : Integer.MAX_VALUE; Scan scan = getMetaScan(connection, rowUpperLimit); @@ -760,13 +759,14 @@ public class MetaTableAccessor { if (stopRow != null) { scan.withStopRow(stopRow); } + if (filter != null) { + scan.setFilter(filter); + } if (LOG.isTraceEnabled()) { - LOG.trace("Scanning META" - + " starting at row=" + Bytes.toStringBinary(startRow) - + " stopping at row=" + Bytes.toStringBinary(stopRow) - + " for max=" + rowUpperLimit - + " with caching=" + scan.getCaching()); + LOG.trace("Scanning META" + " starting at row=" + Bytes.toStringBinary(startRow) + + " stopping at row=" + Bytes.toStringBinary(stopRow) + " for max=" + rowUpperLimit + + " with caching=" + scan.getCaching()); } int currentRow = 0; @@ -1973,7 +1973,7 @@ public class MetaTableAccessor { byte[] value = getParentsBytes(parents); put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow()) .setFamily(HConstants.REPLICATION_BARRIER_FAMILY).setQualifier(REPLICATION_PARENT_QUALIFIER) - .setTimestamp(put.getTimeStamp()).setType(Type.Put).setValue(value).build()); + .setTimestamp(put.getTimestamp()).setType(Type.Put).setValue(value).build()); } private static Put makePutForReplicationBarrier(RegionInfo regionInfo, long openSeqNum, long ts) @@ -1988,7 +1988,7 @@ public class MetaTableAccessor { .setRow(put.getRow()) .setFamily(HConstants.REPLICATION_BARRIER_FAMILY) .setQualifier(HConstants.SEQNUM_QUALIFIER) - .setTimestamp(put.getTimeStamp()) + .setTimestamp(put.getTimestamp()) .setType(Type.Put) .setValue(Bytes.toBytes(openSeqNum)) .build()); @@ -2128,6 +2128,18 @@ public class MetaTableAccessor { return list; } + public static List<String> getTableEncodedRegionNamesForSerialReplication(Connection conn, + TableName tableName) throws IOException { + List<String> list = new ArrayList<>(); + scanMeta(conn, getTableStartRowForMeta(tableName, QueryType.REPLICATION), + getTableStopRowForMeta(tableName, QueryType.REPLICATION), QueryType.REPLICATION, + new FirstKeyOnlyFilter(), Integer.MAX_VALUE, r -> { + list.add(RegionInfo.encodeRegionName(r.getRow())); + return true; + }); + return list; + } + private static void debugLogMutations(List<? extends Mutation> mutations) throws IOException { if (!METALOG.isDebugEnabled()) { return; http://git-wip-us.apache.org/repos/asf/hbase/blob/74ab10c3/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java index cd37ac2..84653ad 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java @@ -91,6 +91,15 @@ public interface ReplicationQueueStorage { * @param peerId peer id */ void removeLastSequenceIds(String peerId) throws ReplicationException; + + /** + * Remove the max sequence id record for the given peer and regions. + * @param peerId peer id + * @param encodedRegionNames the encoded region names + */ + void removeLastSequenceIds(String peerId, List<String> encodedRegionNames) + throws ReplicationException; + /** * Get the current position for a specific WAL in a given queue for a given regionserver. * @param serverName the name of the regionserver http://git-wip-us.apache.org/repos/asf/hbase/blob/74ab10c3/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java index 96b0b91..6d72128 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java @@ -28,6 +28,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.stream.Collectors; import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -347,6 +348,20 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase } @Override + public void removeLastSequenceIds(String peerId, List<String> encodedRegionNames) + throws ReplicationException { + try { + List<ZKUtilOp> listOfOps = + encodedRegionNames.stream().map(n -> getSerialReplicationRegionPeerNode(n, peerId)) + .map(ZKUtilOp::deleteNodeFailSilent).collect(Collectors.toList()); + ZKUtil.multiOrSequential(zookeeper, listOfOps, true); + } catch (KeeperException e) { + throw new ReplicationException("Failed to remove last sequence ids, peerId=" + peerId + + ", encodedRegionNames.size=" + encodedRegionNames.size(), e); + } + } + + @Override public long getWALPosition(ServerName serverName, String queueId, String fileName) throws ReplicationException { byte[] bytes; http://git-wip-us.apache.org/repos/asf/hbase/blob/74ab10c3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java index 72228f6..2f2d5a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.AddPeerStateData; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState; /** * The procedure for adding a new replication peer. @@ -57,8 +58,15 @@ public class AddPeerProcedure extends ModifyPeerProcedure { } @Override - protected boolean reopenRegionsAfterRefresh() { - return true; + protected PeerModificationState nextStateAfterRefresh() { + return peerConfig.isSerial() ? PeerModificationState.SERIAL_PEER_REOPEN_REGIONS + : super.nextStateAfterRefresh(); + } + + @Override + protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env) + throws IOException, ReplicationException { + setLastPushedSequenceId(env, peerConfig); } @Override @@ -102,7 +110,7 @@ public class AddPeerProcedure extends ModifyPeerProcedure { protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { super.serializeStateData(serializer); serializer.serialize(AddPeerStateData.newBuilder() - .setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)).setEnabled(enabled).build()); + .setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)).setEnabled(enabled).build()); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/74ab10c3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java index 2b76487..8bedeff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java @@ -18,11 +18,10 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.HashMap; import java.util.Map; -import java.util.stream.Stream; import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.TableDescriptor; @@ -55,7 +54,7 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi private static final Logger LOG = LoggerFactory.getLogger(ModifyPeerProcedure.class); - private static final int SET_LAST_SEQ_ID_BATCH_SIZE = 1000; + protected static final int UPDATE_LAST_SEQ_ID_BATCH_SIZE = 1000; protected ModifyPeerProcedure() { } @@ -93,12 +92,11 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi } /** - * Implementation class can override this method. The default return value is false which means we - * will jump to POST_PEER_MODIFICATION and finish the procedure. If returns true, we will jump to - * SERIAL_PEER_REOPEN_REGIONS. + * Implementation class can override this method. By default we will jump to + * POST_PEER_MODIFICATION and finish the procedure. */ - protected boolean reopenRegionsAfterRefresh() { - return false; + protected PeerModificationState nextStateAfterRefresh() { + return PeerModificationState.POST_PEER_MODIFICATION; } /** @@ -123,80 +121,97 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi throw new UnsupportedOperationException(); } - private Stream<TableDescriptor> getTables(MasterProcedureEnv env) throws IOException { - ReplicationPeerConfig peerConfig = getNewPeerConfig(); - Stream<TableDescriptor> stream = env.getMasterServices().getTableDescriptors().getAll().values() - .stream().filter(TableDescriptor::hasGlobalReplicationScope) - .filter(td -> ReplicationUtils.contains(peerConfig, td.getTableName())); - ReplicationPeerConfig oldPeerConfig = getOldPeerConfig(); - if (oldPeerConfig != null && oldPeerConfig.isSerial()) { - stream = stream.filter(td -> !ReplicationUtils.contains(oldPeerConfig, td.getTableName())); - } - return stream; + protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env) + throws IOException, ReplicationException { + throw new UnsupportedOperationException(); } private void reopenRegions(MasterProcedureEnv env) throws IOException { - Stream<TableDescriptor> stream = getTables(env); + ReplicationPeerConfig peerConfig = getNewPeerConfig(); + ReplicationPeerConfig oldPeerConfig = getOldPeerConfig(); TableStateManager tsm = env.getMasterServices().getTableStateManager(); - stream.filter(td -> { - try { - return tsm.getTableState(td.getTableName()).isEnabled(); - } catch (TableStateNotFoundException e) { - return false; - } catch (IOException e) { - throw new UncheckedIOException(e); + for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) { + if (!td.hasGlobalReplicationScope()) { + continue; + } + TableName tn = td.getTableName(); + if (!ReplicationUtils.contains(peerConfig, tn)) { + continue; + } + if (oldPeerConfig != null && oldPeerConfig.isSerial() && + ReplicationUtils.contains(oldPeerConfig, tn)) { + continue; } - }).forEach(td -> { try { - addChildProcedure(env.getAssignmentManager().createReopenProcedures( - env.getAssignmentManager().getRegionStates().getRegionsOfTable(td.getTableName()))); - } catch (IOException e) { - throw new UncheckedIOException(e); + if (!tsm.getTableState(tn).isEnabled()) { + continue; + } + } catch (TableStateNotFoundException e) { + continue; } - }); + addChildProcedure(env.getAssignmentManager().createReopenProcedures( + env.getAssignmentManager().getRegionStates().getRegionsOfTable(tn))); + } } private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier, ReplicationQueueStorage queueStorage) throws ReplicationException { if (barrier >= 0) { lastSeqIds.put(encodedRegionName, barrier); - if (lastSeqIds.size() >= SET_LAST_SEQ_ID_BATCH_SIZE) { + if (lastSeqIds.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) { queueStorage.setLastSequenceIds(peerId, lastSeqIds); lastSeqIds.clear(); } } } - private void setLastSequenceIdForSerialPeer(MasterProcedureEnv env) - throws IOException, ReplicationException { - Stream<TableDescriptor> stream = getTables(env); + protected final void setLastPushedSequenceId(MasterProcedureEnv env, + ReplicationPeerConfig peerConfig) throws IOException, ReplicationException { + Map<String, Long> lastSeqIds = new HashMap<String, Long>(); + for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) { + if (!td.hasGlobalReplicationScope()) { + continue; + } + TableName tn = td.getTableName(); + if (!ReplicationUtils.contains(peerConfig, tn)) { + continue; + } + setLastPushedSequenceIdForTable(env, tn, lastSeqIds); + } + if (!lastSeqIds.isEmpty()) { + env.getReplicationPeerManager().getQueueStorage().setLastSequenceIds(peerId, lastSeqIds); + } + } + + // Will put the encodedRegionName->lastPushedSeqId pair into the map passed in, if the map is + // large enough we will call queueStorage.setLastSequenceIds and clear the map. So the caller + // should not forget to check whether the map is empty at last, if not you should call + // queueStorage.setLastSequenceIds to write out the remaining entries in the map. + protected final void setLastPushedSequenceIdForTable(MasterProcedureEnv env, TableName tableName, + Map<String, Long> lastSeqIds) throws IOException, ReplicationException { TableStateManager tsm = env.getMasterServices().getTableStateManager(); ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage(); Connection conn = env.getMasterServices().getConnection(); RegionStates regionStates = env.getAssignmentManager().getRegionStates(); MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); - Map<String, Long> lastSeqIds = new HashMap<String, Long>(); - stream.forEach(td -> { - try { - if (tsm.getTableState(td.getTableName()).isEnabled()) { - for (Pair<String, Long> name2Barrier : MetaTableAccessor - .getTableEncodedRegionNameAndLastBarrier(conn, td.getTableName())) { - addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1, - queueStorage); - } - } else { - for (RegionInfo region : regionStates.getRegionsOfTable(td.getTableName(), true)) { - long maxSequenceId = - WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region)); - addToMap(lastSeqIds, region.getEncodedName(), maxSequenceId, queueStorage); - } - } - } catch (IOException | ReplicationException e) { - throw new RuntimeException(e); + boolean isTableEnabled; + try { + isTableEnabled = tsm.getTableState(tableName).isEnabled(); + } catch (TableStateNotFoundException e) { + return; + } + if (isTableEnabled) { + for (Pair<String, Long> name2Barrier : MetaTableAccessor + .getTableEncodedRegionNameAndLastBarrier(conn, tableName)) { + addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1, + queueStorage); + } + } else { + for (RegionInfo region : regionStates.getRegionsOfTable(tableName, true)) { + long maxSequenceId = + WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region)); + addToMap(lastSeqIds, region.getEncodedName(), maxSequenceId, queueStorage); } - }); - if (!lastSeqIds.isEmpty()) { - queueStorage.setLastSequenceIds(peerId, lastSeqIds); } } @@ -232,8 +247,7 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi return Flow.HAS_MORE_STATE; case REFRESH_PEER_ON_RS: refreshPeer(env, getPeerOperationType()); - setNextState(reopenRegionsAfterRefresh() ? PeerModificationState.SERIAL_PEER_REOPEN_REGIONS - : PeerModificationState.POST_PEER_MODIFICATION); + setNextState(nextStateAfterRefresh()); return Flow.HAS_MORE_STATE; case SERIAL_PEER_REOPEN_REGIONS: try { @@ -246,7 +260,7 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi return Flow.HAS_MORE_STATE; case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID: try { - setLastSequenceIdForSerialPeer(env); + updateLastPushedSequenceIdForSerialPeer(env); } catch (Exception e) { LOG.warn("{} set last sequence id for peer {} failed, retry", getClass().getName(), peerId, e); http://git-wip-us.apache.org/repos/asf/hbase/blob/74ab10c3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java index ccfd4a0..39c8fa9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java @@ -18,6 +18,14 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; @@ -25,11 +33,13 @@ import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.UpdatePeerConfigStateData; /** @@ -59,12 +69,84 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure { return PeerOperationType.UPDATE_CONFIG; } + private void addToList(List<String> encodedRegionNames, String encodedRegionName, + ReplicationQueueStorage queueStorage) throws ReplicationException { + encodedRegionNames.add(encodedRegionName); + if (encodedRegionNames.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) { + queueStorage.removeLastSequenceIds(peerId, encodedRegionNames); + encodedRegionNames.clear(); + } + } + + @Override + protected PeerModificationState nextStateAfterRefresh() { + if (peerConfig.isSerial()) { + if (oldPeerConfig.isSerial()) { + // both serial, then if the ns/table-cfs configs are not changed, just go with the normal + // way, otherwise we need to reopen the regions for the newly added tables. + return ReplicationUtils.isNamespacesAndTableCFsEqual(peerConfig, oldPeerConfig) + ? super.nextStateAfterRefresh() + : PeerModificationState.SERIAL_PEER_REOPEN_REGIONS; + } else { + // we change the peer to serial, need to reopen all regions + return PeerModificationState.SERIAL_PEER_REOPEN_REGIONS; + } + } else { + if (oldPeerConfig.isSerial()) { + // we remove the serial flag for peer, then we do not need to reopen all regions, but we + // need to remove the last pushed sequence ids. + return PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID; + } else { + // not serial for both, just go with the normal way. + return super.nextStateAfterRefresh(); + } + } + } + @Override - protected boolean reopenRegionsAfterRefresh() { - // If we remove some tables from the peer config then we do not need to enter the extra states - // for serial replication. Could try to optimize later since it is not easy to determine this... - return peerConfig.isSerial() && (!oldPeerConfig.isSerial() || - !ReplicationUtils.isNamespacesAndTableCFsEqual(peerConfig, oldPeerConfig)); + protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env) + throws IOException, ReplicationException { + if (!oldPeerConfig.isSerial()) { + assert peerConfig.isSerial(); + // change to serial + setLastPushedSequenceId(env, peerConfig); + return; + } + if (!peerConfig.isSerial()) { + // remove the serial flag + env.getReplicationPeerManager().removeAllLastPushedSeqIds(peerId); + return; + } + // enter here means peerConfig and oldPeerConfig are both serial, let's find out the diffs and + // process them + ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage(); + Connection conn = env.getMasterServices().getConnection(); + Map<String, Long> lastSeqIds = new HashMap<String, Long>(); + List<String> encodedRegionNames = new ArrayList<>(); + for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) { + if (!td.hasGlobalReplicationScope()) { + continue; + } + TableName tn = td.getTableName(); + if (ReplicationUtils.contains(oldPeerConfig, tn)) { + if (!ReplicationUtils.contains(peerConfig, tn)) { + // removed from peer config + for (String encodedRegionName : MetaTableAccessor + .getTableEncodedRegionNamesForSerialReplication(conn, tn)) { + addToList(encodedRegionNames, encodedRegionName, queueStorage); + } + } + } else if (ReplicationUtils.contains(peerConfig, tn)) { + // newly added to peer config + setLastPushedSequenceIdForTable(env, tn, lastSeqIds); + } + } + if (!encodedRegionNames.isEmpty()) { + queueStorage.removeLastSequenceIds(peerId, encodedRegionNames); + } + if (!lastSeqIds.isEmpty()) { + queueStorage.setLastSequenceIds(peerId, lastSeqIds); + } } @Override @@ -99,7 +181,9 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure { @Override protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { env.getReplicationPeerManager().updatePeerConfig(peerId, peerConfig); - if (enabled && reopenRegionsAfterRefresh()) { + // if we need to jump to the special states for serial peers, then we need to disable the peer + // first if it is not disabled yet. + if (enabled && nextStateAfterRefresh() != super.nextStateAfterRefresh()) { env.getReplicationPeerManager().disablePeer(peerId); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/74ab10c3/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java index 3b807aa..7a1bc55 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java @@ -186,8 +186,8 @@ public class TestEnableTable { fail("Got an exception while deleting " + tableName); } int rowCount = 0; - try (ResultScanner scanner = - metaTable.getScanner(MetaTableAccessor.getScanForTableName(TEST_UTIL.getConnection(), tableName))) { + try (ResultScanner scanner = metaTable + .getScanner(MetaTableAccessor.getScanForTableName(TEST_UTIL.getConnection(), tableName))) { for (Result result : scanner) { LOG.info("Found when none expected: " + result); rowCount++; http://git-wip-us.apache.org/repos/asf/hbase/blob/74ab10c3/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestRemoveFromSerialReplicationPeer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestRemoveFromSerialReplicationPeer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestRemoveFromSerialReplicationPeer.java new file mode 100644 index 0000000..eda15d8 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestRemoveFromSerialReplicationPeer.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Collections; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; + +/** + * Testcase for HBASE-20296. + */ +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestRemoveFromSerialReplicationPeer extends SerialReplicationTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRemoveFromSerialReplicationPeer.class); + + @Before + public void setUp() throws IOException, StreamLacksCapabilityException { + setupWALWriter(); + } + + private void waitUntilHasLastPushedSequenceId(RegionInfo region) throws Exception { + ReplicationQueueStorage queueStorage = + UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage(); + UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { + + @Override + public boolean evaluate() throws Exception { + return queueStorage.getLastSequenceId(region.getEncodedName(), PEER_ID) > 0; + } + + @Override + public String explainFailure() throws Exception { + return "Still no last pushed sequence id for " + region; + } + }); + } + + @Test + public void testRemoveTable() throws Exception { + TableName tableName = createTable(); + ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() + .setClusterKey("127.0.0.1:2181:/hbase") + .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()) + .setReplicateAllUserTables(false) + .setTableCFsMap(ImmutableMap.of(tableName, Collections.emptyList())).setSerial(true).build(); + UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo(); + waitUntilHasLastPushedSequenceId(region); + + UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID, + ReplicationPeerConfig.newBuilder(peerConfig).setTableCFsMap(Collections.emptyMap()).build()); + + ReplicationQueueStorage queueStorage = + UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage(); + assertEquals(HConstants.NO_SEQNUM, + queueStorage.getLastSequenceId(region.getEncodedName(), PEER_ID)); + } + + @Test + public void testRemoveSerialFlag() throws Exception { + TableName tableName = createTable(); + addPeer(true); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo(); + waitUntilHasLastPushedSequenceId(region); + UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID, ReplicationPeerConfig + .newBuilder(UTIL.getAdmin().getReplicationPeerConfig(PEER_ID)).setSerial(false).build()); + waitUntilReplicationDone(100); + + ReplicationQueueStorage queueStorage = + UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage(); + assertEquals(HConstants.NO_SEQNUM, + queueStorage.getLastSequenceId(region.getEncodedName(), PEER_ID)); + } +}