HBASE-19957 General framework to transit sync replication state
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/58b5849f Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/58b5849f Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/58b5849f Branch: refs/heads/HBASE-19064 Commit: 58b5849feae6aa0103c8751707fd63a4a10009fd Parents: e935a4c Author: zhangduo <zhang...@apache.org> Authored: Fri Feb 9 18:33:28 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Fri May 25 10:11:48 2018 +0800 ---------------------------------------------------------------------- .../replication/ReplicationPeerConfig.java | 2 - .../replication/ReplicationPeerDescription.java | 5 +- .../hbase/replication/SyncReplicationState.java | 19 +- .../org/apache/hadoop/hbase/HConstants.java | 3 + .../src/main/protobuf/MasterProcedure.proto | 20 +- .../hbase/replication/ReplicationPeerImpl.java | 45 ++++- .../replication/ReplicationPeerStorage.java | 25 ++- .../hbase/replication/ReplicationPeers.java | 27 ++- .../replication/ZKReplicationPeerStorage.java | 63 +++++-- .../hbase/coprocessor/MasterObserver.java | 7 +- .../org/apache/hadoop/hbase/master/HMaster.java | 4 +- .../hbase/master/MasterCoprocessorHost.java | 12 +- .../replication/AbstractPeerProcedure.java | 14 +- .../master/replication/ModifyPeerProcedure.java | 11 -- .../replication/RefreshPeerProcedure.java | 18 +- .../replication/ReplicationPeerManager.java | 89 +++++---- ...ransitPeerSyncReplicationStateProcedure.java | 181 ++++++++++++------- .../hbase/regionserver/HRegionServer.java | 35 ++-- .../regionserver/ReplicationSourceService.java | 11 +- .../regionserver/PeerActionListener.java | 4 +- .../regionserver/PeerProcedureHandler.java | 16 +- .../regionserver/PeerProcedureHandlerImpl.java | 52 +++++- .../regionserver/RefreshPeerCallable.java | 7 + .../replication/regionserver/Replication.java | 22 ++- .../regionserver/ReplicationSourceManager.java | 41 +++-- .../SyncReplicationPeerInfoProvider.java | 43 +++++ .../SyncReplicationPeerInfoProviderImpl.java | 71 ++++++++ .../SyncReplicationPeerMappingManager.java | 48 +++++ .../SyncReplicationPeerProvider.java | 35 ---- .../hbase/wal/SyncReplicationWALProvider.java | 35 ++-- .../org/apache/hadoop/hbase/wal/WALFactory.java | 47 ++--- .../replication/TestReplicationAdmin.java | 3 +- .../TestReplicationSourceManager.java | 5 +- .../wal/TestSyncReplicationWALProvider.java | 36 ++-- 34 files changed, 743 insertions(+), 313 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java index 997a155..cc7b4bc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.replication; import java.util.Collection; @@ -25,7 +24,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; - import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.util.Bytes; http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerDescription.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerDescription.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerDescription.java index 2d077c5..b0c27bb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerDescription.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerDescription.java @@ -20,7 +20,10 @@ package org.apache.hadoop.hbase.replication; import org.apache.yetus.audience.InterfaceAudience; /** - * The POJO equivalent of ReplicationProtos.ReplicationPeerDescription + * The POJO equivalent of ReplicationProtos.ReplicationPeerDescription. + * <p> + * To developer, here we do not store the new sync replication state since it is just an + * intermediate state and this class is public. */ @InterfaceAudience.Public public class ReplicationPeerDescription { http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java index a65b144..de9576c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java @@ -29,14 +29,19 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; /** * Used by synchronous replication. Indicate the state of the current cluster in a synchronous * replication peer. The state may be one of {@link SyncReplicationState#ACTIVE}, - * {@link SyncReplicationState#DOWNGRADE_ACTIVE} or - * {@link SyncReplicationState#STANDBY}. + * {@link SyncReplicationState#DOWNGRADE_ACTIVE} or {@link SyncReplicationState#STANDBY}. * <p> * For asynchronous replication, the state is {@link SyncReplicationState#NONE}. */ @InterfaceAudience.Public public enum SyncReplicationState { - NONE, ACTIVE, DOWNGRADE_ACTIVE, STANDBY; + NONE(0), ACTIVE(1), DOWNGRADE_ACTIVE(2), STANDBY(3); + + private final byte value; + + private SyncReplicationState(int value) { + this.value = (byte) value; + } public static SyncReplicationState valueOf(int value) { switch (value) { @@ -53,13 +58,17 @@ public enum SyncReplicationState { } } + public int value() { + return value & 0xFF; + } + public static byte[] toByteArray(SyncReplicationState state) { return ProtobufUtil - .prependPBMagic(ReplicationPeerConfigUtil.toSyncReplicationState(state).toByteArray()); + .prependPBMagic(ReplicationPeerConfigUtil.toSyncReplicationState(state).toByteArray()); } public static SyncReplicationState parseFrom(byte[] bytes) throws InvalidProtocolBufferException { return ReplicationPeerConfigUtil.toSyncReplicationState(ReplicationProtos.SyncReplicationState - .parseFrom(Arrays.copyOfRange(bytes, ProtobufUtil.lengthOfPBMagic(), bytes.length))); + .parseFrom(Arrays.copyOfRange(bytes, ProtobufUtil.lengthOfPBMagic(), bytes.length))); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 9241682..522c2cf 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1355,6 +1355,9 @@ public final class HConstants { public static final String NOT_IMPLEMENTED = "Not implemented"; + // TODO: need to find a better place to hold it. + public static final String SYNC_REPLICATION_ENABLED = "hbase.replication.sync.enabled"; + private HConstants() { // Can't be instantiated with this ctor. } http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index 39fc72a..67c1b43 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -389,6 +389,17 @@ enum PeerModificationState { POST_PEER_MODIFICATION = 8; } +enum PeerSyncReplicationStateTransitionState { + PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION = 1; + SET_PEER_NEW_SYNC_REPLICATION_STATE = 2; + REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN = 3; + REPLAY_REMOTE_WAL_IN_PEER = 4; + REOPEN_ALL_REGIONS_IN_PEER = 5; + TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE = 6; + REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END = 7; + POST_PEER_SYNC_REPLICATION_STATE_TRANSITION = 8; +} + message PeerModificationStateData { required string peer_id = 1; } @@ -399,18 +410,23 @@ enum PeerModificationType { ENABLE_PEER = 3; DISABLE_PEER = 4; UPDATE_PEER_CONFIG = 5; + TRANSIT_SYNC_REPLICATION_STATE = 6; } message RefreshPeerStateData { required string peer_id = 1; required PeerModificationType type = 2; required ServerName target_server = 3; + /** We need multiple stages for sync replication state transition **/ + optional uint32 stage = 4 [default = 0]; } message RefreshPeerParameter { required string peer_id = 1; required PeerModificationType type = 2; required ServerName target_server = 3; + /** We need multiple stages for sync replication state transition **/ + optional uint32 stage = 4 [default = 0];; } message PeerProcedureStateData { @@ -438,5 +454,7 @@ message DisablePeerStateData { } message TransitPeerSyncReplicationStateStateData { - required SyncReplicationState syncReplicationState = 1; + /** Could be null if we fail in pre check, so optional */ + optional SyncReplicationState fromState = 1; + required SyncReplicationState toState = 2; } http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java index ff3f662..22026e5 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private @@ -36,7 +37,14 @@ public class ReplicationPeerImpl implements ReplicationPeer { private volatile PeerState peerState; - private volatile SyncReplicationState syncReplicationState; + // The lower 16 bits are the current sync replication state, the higher 16 bits are the new sync + // replication state. Embedded in one int so user can not get an inconsistency view of state and + // new state. + private volatile int syncReplicationStateBits; + + private static final int SHIFT = 16; + + private static final int AND_BITS = 0xFFFF; private final List<ReplicationPeerConfigListener> peerConfigListeners; @@ -48,12 +56,14 @@ public class ReplicationPeerImpl implements ReplicationPeer { * @param peerConfig configuration for the replication peer */ public ReplicationPeerImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig, - boolean peerState, SyncReplicationState syncReplicationState) { + boolean peerState, SyncReplicationState syncReplicationState, + SyncReplicationState newSyncReplicationState) { this.conf = conf; this.id = id; this.peerState = peerState ? PeerState.ENABLED : PeerState.DISABLED; this.peerConfig = peerConfig; - this.syncReplicationState = syncReplicationState; + this.syncReplicationStateBits = + syncReplicationState.value() | (newSyncReplicationState.value() << SHIFT); this.peerConfigListeners = new ArrayList<>(); } @@ -66,6 +76,16 @@ public class ReplicationPeerImpl implements ReplicationPeer { peerConfigListeners.forEach(listener -> listener.peerConfigUpdated(peerConfig)); } + public void setNewSyncReplicationState(SyncReplicationState newState) { + this.syncReplicationStateBits = + (this.syncReplicationStateBits & AND_BITS) | (newState.value() << SHIFT); + } + + public void transitSyncReplicationState() { + this.syncReplicationStateBits = + (this.syncReplicationStateBits >>> SHIFT) | (SyncReplicationState.NONE.value() << SHIFT); + } + /** * Get the identifier of this peer * @return string representation of the id (short) @@ -80,9 +100,26 @@ public class ReplicationPeerImpl implements ReplicationPeer { return peerState; } + private static SyncReplicationState getSyncReplicationState(int bits) { + return SyncReplicationState.valueOf(bits & AND_BITS); + } + + private static SyncReplicationState getNewSyncReplicationState(int bits) { + return SyncReplicationState.valueOf(bits >>> SHIFT); + } + + public Pair<SyncReplicationState, SyncReplicationState> getSyncReplicationStateAndNewState() { + int bits = this.syncReplicationStateBits; + return Pair.newPair(getSyncReplicationState(bits), getNewSyncReplicationState(bits)); + } + + public SyncReplicationState getNewSyncReplicationState() { + return getNewSyncReplicationState(syncReplicationStateBits); + } + @Override public SyncReplicationState getSyncReplicationState() { - return syncReplicationState; + return getSyncReplicationState(syncReplicationStateBits); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java index d2538ab..f74ac37 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.replication; import java.util.List; - import org.apache.yetus.audience.InterfaceAudience; /** @@ -72,16 +71,30 @@ public interface ReplicationPeerStorage { ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException; /** - * Set the state of current cluster in a synchronous replication peer. + * Set the new sync replication state that we are going to transit to. * @throws ReplicationException if there are errors accessing the storage service. */ - void setPeerSyncReplicationState(String peerId, SyncReplicationState state) + void setPeerNewSyncReplicationState(String peerId, SyncReplicationState state) throws ReplicationException; /** - * Get the state of current cluster in a synchronous replication peer. + * Overwrite the sync replication state with the new sync replication state which is set with the + * {@link #setPeerNewSyncReplicationState(String, SyncReplicationState)} method above, and clear + * the new sync replication state. * @throws ReplicationException if there are errors accessing the storage service. */ - SyncReplicationState getPeerSyncReplicationState(String peerId) - throws ReplicationException; + void transitPeerSyncReplicationState(String peerId) throws ReplicationException; + + /** + * Get the sync replication state. + * @throws ReplicationException if there are errors accessing the storage service. + */ + SyncReplicationState getPeerSyncReplicationState(String peerId) throws ReplicationException; + + /** + * Get the new sync replication state. Will return {@link SyncReplicationState#NONE} if we are + * not in a transition. + * @throws ReplicationException if there are errors accessing the storage service. + */ + SyncReplicationState getPeerNewSyncReplicationState(String peerId) throws ReplicationException; } http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java index a54f339..ba6da7a 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java @@ -80,8 +80,8 @@ public class ReplicationPeers { return true; } - public void removePeer(String peerId) { - peerCache.remove(peerId); + public ReplicationPeerImpl removePeer(String peerId) { + return peerCache.remove(peerId); } /** @@ -110,22 +110,29 @@ public class ReplicationPeers { public PeerState refreshPeerState(String peerId) throws ReplicationException { ReplicationPeerImpl peer = peerCache.get(peerId); - if (peer == null) { - throw new ReplicationException("Peer with id=" + peerId + " is not cached."); - } peer.setPeerState(peerStorage.isPeerEnabled(peerId)); return peer.getPeerState(); } public ReplicationPeerConfig refreshPeerConfig(String peerId) throws ReplicationException { ReplicationPeerImpl peer = peerCache.get(peerId); - if (peer == null) { - throw new ReplicationException("Peer with id=" + peerId + " is not cached."); - } peer.setPeerConfig(peerStorage.getPeerConfig(peerId)); return peer.getPeerConfig(); } + public SyncReplicationState refreshPeerNewSyncReplicationState(String peerId) + throws ReplicationException { + ReplicationPeerImpl peer = peerCache.get(peerId); + SyncReplicationState newState = peerStorage.getPeerNewSyncReplicationState(peerId); + peer.setNewSyncReplicationState(newState); + return newState; + } + + public void transitPeerSyncReplicationState(String peerId) { + ReplicationPeerImpl peer = peerCache.get(peerId); + peer.transitSyncReplicationState(); + } + /** * Helper method to connect to a peer * @param peerId peer's identifier @@ -135,7 +142,9 @@ public class ReplicationPeers { ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); boolean enabled = peerStorage.isPeerEnabled(peerId); SyncReplicationState syncReplicationState = peerStorage.getPeerSyncReplicationState(peerId); + SyncReplicationState newSyncReplicationState = + peerStorage.getPeerNewSyncReplicationState(peerId); return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf), - peerId, peerConfig, enabled, syncReplicationState); + peerId, peerConfig, enabled, syncReplicationState, newSyncReplicationState); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java index 9107cf6..a2cdfdf 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java @@ -53,7 +53,12 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase public static final byte[] DISABLED_ZNODE_BYTES = toByteArray(ReplicationProtos.ReplicationState.State.DISABLED); - public static final String SYNCHRONOUS_REPLICATION_STATE_ZNODE = "sync-rep-state"; + public static final String SYNC_REPLICATION_STATE_ZNODE = "sync-rep-state"; + + public static final String NEW_SYNC_REPLICATION_STATE_ZNODE = "new-sync-rep-state"; + + public static final byte[] NONE_STATE_ZNODE_BYTES = + SyncReplicationState.toByteArray(SyncReplicationState.NONE); /** * The name of the znode that contains the replication status of a remote slave (i.e. peer) @@ -85,7 +90,11 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase @VisibleForTesting public String getSyncReplicationStateNode(String peerId) { - return ZNodePaths.joinZNode(getPeerNode(peerId), SYNCHRONOUS_REPLICATION_STATE_ZNODE); + return ZNodePaths.joinZNode(getPeerNode(peerId), SYNC_REPLICATION_STATE_ZNODE); + } + + private String getNewSyncReplicationStateNode(String peerId) { + return ZNodePaths.joinZNode(getPeerNode(peerId), NEW_SYNC_REPLICATION_STATE_ZNODE); } @Override @@ -97,14 +106,15 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase ZKUtilOp.createAndFailSilent(getPeerStateNode(peerId), enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES), ZKUtilOp.createAndFailSilent(getSyncReplicationStateNode(peerId), - SyncReplicationState.toByteArray(syncReplicationState))); + SyncReplicationState.toByteArray(syncReplicationState)), + ZKUtilOp.createAndFailSilent(getNewSyncReplicationStateNode(peerId), NONE_STATE_ZNODE_BYTES)); try { ZKUtil.createWithParents(zookeeper, peersZNode); ZKUtil.multiOrSequential(zookeeper, multiOps, false); } catch (KeeperException e) { throw new ReplicationException( "Could not add peer with id=" + peerId + ", peerConfig=>" + peerConfig + ", state=" + - (enabled ? "ENABLED" : "DISABLED") + ", syncReplicationState=" + syncReplicationState, + (enabled ? "ENABLED" : "DISABLED") + ", syncReplicationState=" + syncReplicationState, e); } } @@ -136,7 +146,7 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase ReplicationPeerConfigUtil.toByteArray(peerConfig)); } catch (KeeperException e) { throw new ReplicationException( - "There was a problem trying to save changes to the " + "replication peer " + peerId, e); + "There was a problem trying to save changes to the " + "replication peer " + peerId, e); } } @@ -170,38 +180,63 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase } if (data == null || data.length == 0) { throw new ReplicationException( - "Replication peer config data shouldn't be empty, peerId=" + peerId); + "Replication peer config data shouldn't be empty, peerId=" + peerId); } try { return ReplicationPeerConfigUtil.parsePeerFrom(data); } catch (DeserializationException e) { throw new ReplicationException( - "Failed to parse replication peer config for peer with id=" + peerId, e); + "Failed to parse replication peer config for peer with id=" + peerId, e); } } @Override - public void setPeerSyncReplicationState(String peerId, SyncReplicationState state) + public void setPeerNewSyncReplicationState(String peerId, SyncReplicationState state) throws ReplicationException { try { - ZKUtil.setData(zookeeper, getSyncReplicationStateNode(peerId), + ZKUtil.createSetData(zookeeper, getNewSyncReplicationStateNode(peerId), SyncReplicationState.toByteArray(state)); } catch (KeeperException e) { throw new ReplicationException( - "Unable to change the cluster state for the synchronous replication peer with id=" + peerId, - e); + "Unable to set the new sync replication state for peer with id=" + peerId, e); } } @Override - public SyncReplicationState getPeerSyncReplicationState(String peerId) + public void transitPeerSyncReplicationState(String peerId) throws ReplicationException { + String newStateNode = getNewSyncReplicationStateNode(peerId); + try { + byte[] data = ZKUtil.getData(zookeeper, newStateNode); + ZKUtil.multiOrSequential(zookeeper, + Arrays.asList(ZKUtilOp.setData(newStateNode, NONE_STATE_ZNODE_BYTES), + ZKUtilOp.setData(getSyncReplicationStateNode(peerId), data)), + false); + } catch (KeeperException | InterruptedException e) { + throw new ReplicationException( + "Error transiting sync replication state for peer with id=" + peerId, e); + } + } + + private SyncReplicationState getSyncReplicationState(String peerId, String path) throws ReplicationException { try { - byte[] data = ZKUtil.getData(zookeeper, getSyncReplicationStateNode(peerId)); + byte[] data = ZKUtil.getData(zookeeper, path); return SyncReplicationState.parseFrom(data); } catch (KeeperException | InterruptedException | IOException e) { throw new ReplicationException( - "Error getting cluster state for the synchronous replication peer with id=" + peerId, e); + "Error getting sync replication state of path " + path + " for peer with id=" + peerId, e); } } + + @Override + public SyncReplicationState getPeerNewSyncReplicationState(String peerId) + throws ReplicationException { + return getSyncReplicationState(peerId, getNewSyncReplicationStateNode(peerId)); + } + + @Override + public SyncReplicationState getPeerSyncReplicationState(String peerId) + throws ReplicationException { + return getSyncReplicationState(peerId, getSyncReplicationStateNode(peerId)); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java index 1fc0f37..a0aba3b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java @@ -1334,7 +1334,7 @@ public interface MasterObserver { * Called before transit current cluster state for the specified synchronous replication peer * @param ctx the environment to interact with the framework and master * @param peerId a short name that identifies the peer - * @param state a new state + * @param state the new state */ default void preTransitReplicationPeerSyncReplicationState( final ObserverContext<MasterCoprocessorEnvironment> ctx, String peerId, @@ -1345,11 +1345,12 @@ public interface MasterObserver { * Called after transit current cluster state for the specified synchronous replication peer * @param ctx the environment to interact with the framework and master * @param peerId a short name that identifies the peer - * @param state a new state + * @param from the old state + * @param to the new state */ default void postTransitReplicationPeerSyncReplicationState( final ObserverContext<MasterCoprocessorEnvironment> ctx, String peerId, - SyncReplicationState state) throws IOException { + SyncReplicationState from, SyncReplicationState to) throws IOException { } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 8129f9e..e7e585d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -130,10 +130,10 @@ import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure; import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure; +import org.apache.hadoop.hbase.master.replication.AbstractPeerProcedure; import org.apache.hadoop.hbase.master.replication.AddPeerProcedure; import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure; import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure; -import org.apache.hadoop.hbase.master.replication.ModifyPeerProcedure; import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure; import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; import org.apache.hadoop.hbase.master.replication.TransitPeerSyncReplicationStateProcedure; @@ -3421,7 +3421,7 @@ public class HMaster extends HRegionServer implements MasterServices { return favoredNodesManager; } - private long executePeerProcedure(ModifyPeerProcedure procedure) throws IOException { + private long executePeerProcedure(AbstractPeerProcedure<?> procedure) throws IOException { long procId = procedureExecutor.submitProcedure(procedure); procedure.getLatch().await(); return procId; http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java index af57096..56ae2c0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java @@ -1535,22 +1535,22 @@ public class MasterCoprocessorHost }); } - public void preTransitReplicationPeerSyncReplicationState(final String peerId, - final SyncReplicationState clusterState) throws IOException { + public void preTransitReplicationPeerSyncReplicationState(String peerId, + SyncReplicationState state) throws IOException { execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { @Override public void call(MasterObserver observer) throws IOException { - observer.preTransitReplicationPeerSyncReplicationState(this, peerId, clusterState); + observer.preTransitReplicationPeerSyncReplicationState(this, peerId, state); } }); } - public void postTransitReplicationPeerSyncReplicationState(final String peerId, - final SyncReplicationState clusterState) throws IOException { + public void postTransitReplicationPeerSyncReplicationState(String peerId, + SyncReplicationState from, SyncReplicationState to) throws IOException { execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { @Override public void call(MasterObserver observer) throws IOException { - observer.postTransitReplicationPeerSyncReplicationState(this, peerId, clusterState); + observer.postTransitReplicationPeerSyncReplicationState(this, peerId, from, to); } }); } http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java index 0ad8a63..6679d78 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java @@ -46,7 +46,7 @@ public abstract class AbstractPeerProcedure<TState> protected AbstractPeerProcedure(String peerId) { this.peerId = peerId; - this.latch = ProcedurePrepareLatch.createLatch(2, 0); + this.latch = ProcedurePrepareLatch.createLatch(2, 1); } public ProcedurePrepareLatch getLatch() { @@ -94,4 +94,16 @@ public abstract class AbstractPeerProcedure<TState> super.deserializeStateData(serializer); peerId = serializer.deserialize(PeerProcedureStateData.class).getPeerId(); } + + @Override + protected void rollbackState(MasterProcedureEnv env, TState state) + throws IOException, InterruptedException { + if (state == getInitialState()) { + // actually the peer related operations has no rollback, but if we haven't done any + // modifications on the peer storage yet, we can just return. + return; + } + throw new UnsupportedOperationException(); + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java index ea2e314..32b8ea1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java @@ -328,17 +328,6 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi } @Override - protected void rollbackState(MasterProcedureEnv env, PeerModificationState state) - throws IOException, InterruptedException { - if (state == PeerModificationState.PRE_PEER_MODIFICATION) { - // actually the peer related operations has no rollback, but if we haven't done any - // modifications on the peer storage yet, we can just return. - return; - } - throw new UnsupportedOperationException(); - } - - @Override protected PeerModificationState getState(int stateId) { return PeerModificationState.forNumber(stateId); } http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java index ba9bcdc..d51ea63 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java @@ -54,6 +54,8 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv> justification = "Will never change after construction") private ServerName targetServer; + private int stage; + private boolean dispatched; private ProcedureEvent<?> event; @@ -64,9 +66,15 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv> } public RefreshPeerProcedure(String peerId, PeerOperationType type, ServerName targetServer) { + this(peerId, type, targetServer, 0); + } + + public RefreshPeerProcedure(String peerId, PeerOperationType type, ServerName targetServer, + int stage) { this.peerId = peerId; this.type = type; this.targetServer = targetServer; + this.stage = stage; } @Override @@ -91,6 +99,8 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv> return PeerModificationType.DISABLE_PEER; case UPDATE_CONFIG: return PeerModificationType.UPDATE_PEER_CONFIG; + case TRANSIT_SYNC_REPLICATION_STATE: + return PeerModificationType.TRANSIT_SYNC_REPLICATION_STATE; default: throw new IllegalArgumentException("Unknown type: " + type); } @@ -108,6 +118,8 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv> return PeerOperationType.DISABLE; case UPDATE_PEER_CONFIG: return PeerOperationType.UPDATE_CONFIG; + case TRANSIT_SYNC_REPLICATION_STATE: + return PeerOperationType.TRANSIT_SYNC_REPLICATION_STATE; default: throw new IllegalArgumentException("Unknown type: " + type); } @@ -118,7 +130,8 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv> assert targetServer.equals(remote); return new ServerOperation(this, getProcId(), RefreshPeerCallable.class, RefreshPeerParameter.newBuilder().setPeerId(peerId).setType(toPeerModificationType(type)) - .setTargetServer(ProtobufUtil.toServerName(remote)).build().toByteArray()); + .setTargetServer(ProtobufUtil.toServerName(remote)).setStage(stage).build() + .toByteArray()); } private void complete(MasterProcedureEnv env, Throwable error) { @@ -193,7 +206,7 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv> protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { serializer.serialize( RefreshPeerStateData.newBuilder().setPeerId(peerId).setType(toPeerModificationType(type)) - .setTargetServer(ProtobufUtil.toServerName(targetServer)).build()); + .setTargetServer(ProtobufUtil.toServerName(targetServer)).setStage(stage).build()); } @Override @@ -202,5 +215,6 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv> peerId = data.getPeerId(); type = toPeerOperationType(data.getType()); targetServer = ProtobufUtil.toServerName(data.getTargetServer()); + stage = data.getStage(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index ff778a8..0dc922d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.EnumMap; import java.util.EnumSet; import java.util.List; import java.util.Map; @@ -50,6 +49,9 @@ import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; +import org.apache.hbase.thirdparty.com.google.common.collect.Maps; + /** * Manages and performs all replication admin operations. * <p> @@ -64,15 +66,11 @@ public class ReplicationPeerManager { private final ConcurrentMap<String, ReplicationPeerDescription> peers; - private final EnumMap<SyncReplicationState, EnumSet<SyncReplicationState>> allowedTransition = - new EnumMap<SyncReplicationState, EnumSet<SyncReplicationState>>(SyncReplicationState.class) { - { - put(SyncReplicationState.ACTIVE, EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE)); - put(SyncReplicationState.STANDBY, EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE)); - put(SyncReplicationState.DOWNGRADE_ACTIVE, - EnumSet.of(SyncReplicationState.STANDBY, SyncReplicationState.ACTIVE)); - } - }; + private final ImmutableMap<SyncReplicationState, EnumSet<SyncReplicationState>> + allowedTransition = Maps.immutableEnumMap(ImmutableMap.of(SyncReplicationState.ACTIVE, + EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE), SyncReplicationState.STANDBY, + EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE), SyncReplicationState.DOWNGRADE_ACTIVE, + EnumSet.of(SyncReplicationState.STANDBY, SyncReplicationState.ACTIVE))); ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage, ConcurrentMap<String, ReplicationPeerDescription> peers) { @@ -165,9 +163,9 @@ public class ReplicationPeerManager { if (!isStringEquals(peerConfig.getRemoteWALDir(), oldPeerConfig.getRemoteWALDir())) { throw new DoNotRetryIOException( - "Changing the remote wal dir on an existing peer is not allowed. Existing remote wal " + - "dir '" + oldPeerConfig.getRemoteWALDir() + "' for peer " + peerId + - " does not match new remote wal dir '" + peerConfig.getRemoteWALDir() + "'"); + "Changing the remote wal dir on an existing peer is not allowed. Existing remote wal " + + "dir '" + oldPeerConfig.getRemoteWALDir() + "' for peer " + peerId + + " does not match new remote wal dir '" + peerConfig.getRemoteWALDir() + "'"); } if (oldPeerConfig.isSyncReplication()) { @@ -180,15 +178,19 @@ public class ReplicationPeerManager { return desc; } - public void preTransitPeerSyncReplicationState(String peerId, SyncReplicationState state) - throws DoNotRetryIOException { + /** + * @return the old state. + */ + public SyncReplicationState preTransitPeerSyncReplicationState(String peerId, + SyncReplicationState state) throws DoNotRetryIOException { ReplicationPeerDescription desc = checkPeerExists(peerId); SyncReplicationState fromState = desc.getSyncReplicationState(); EnumSet<SyncReplicationState> allowedToStates = allowedTransition.get(fromState); if (allowedToStates == null || !allowedToStates.contains(state)) { throw new DoNotRetryIOException("Can not transit current cluster state from " + fromState + - " to " + state + " for peer id=" + peerId); + " to " + state + " for peer id=" + peerId); } + return fromState; } public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) @@ -199,8 +201,8 @@ public class ReplicationPeerManager { } ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build(); SyncReplicationState syncReplicationState = - copiedPeerConfig.isSyncReplication() ? SyncReplicationState.DOWNGRADE_ACTIVE - : SyncReplicationState.NONE; + copiedPeerConfig.isSyncReplication() ? SyncReplicationState.DOWNGRADE_ACTIVE + : SyncReplicationState.NONE; peerStorage.addPeer(peerId, copiedPeerConfig, enabled, syncReplicationState); peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig, syncReplicationState)); @@ -240,7 +242,7 @@ public class ReplicationPeerManager { ReplicationPeerDescription desc = peers.get(peerId); ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig(); ReplicationPeerConfigBuilder newPeerConfigBuilder = - ReplicationPeerConfig.newBuilder(peerConfig); + ReplicationPeerConfig.newBuilder(peerConfig); // we need to use the new conf to overwrite the old one. newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration()); newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration()); @@ -257,7 +259,7 @@ public class ReplicationPeerManager { return new ArrayList<>(peers.values()); } return peers.values().stream().filter(r -> pattern.matcher(r.getPeerId()).matches()) - .collect(Collectors.toList()); + .collect(Collectors.toList()); } public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) { @@ -269,12 +271,23 @@ public class ReplicationPeerManager { queueStorage.removeLastSequenceIds(peerId); } - public void transitPeerSyncReplicationState(String peerId, SyncReplicationState state) + public void setPeerNewSyncReplicationState(String peerId, SyncReplicationState state) throws ReplicationException { + peerStorage.setPeerNewSyncReplicationState(peerId, state); + } + + public void transitPeerSyncReplicationState(String peerId, SyncReplicationState newState) + throws ReplicationException { + if (peerStorage.getPeerNewSyncReplicationState(peerId) != SyncReplicationState.NONE) { + // Only transit if this is not a retry + peerStorage.transitPeerSyncReplicationState(peerId); + } ReplicationPeerDescription desc = peers.get(peerId); - peerStorage.setPeerSyncReplicationState(peerId, state); - peers.put(peerId, - new ReplicationPeerDescription(peerId, desc.isEnabled(), desc.getPeerConfig(), state)); + if (desc.getSyncReplicationState() != newState) { + // Only recreate the desc if this is not a retry + peers.put(peerId, + new ReplicationPeerDescription(peerId, desc.isEnabled(), desc.getPeerConfig(), newState)); + } } public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException { @@ -301,10 +314,10 @@ public class ReplicationPeerManager { // If replicate_all flag is true, it means all user tables will be replicated to peer cluster. // Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer // cluster. - if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) - || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { - throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly " - + "when you want replicate all cluster"); + if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) || + (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { + throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly " + + "when you want replicate all cluster"); } checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(), peerConfig.getExcludeTableCFsMap()); @@ -312,13 +325,13 @@ public class ReplicationPeerManager { // If replicate_all flag is false, it means all user tables can't be replicated to peer // cluster. Then allow to config namespaces or table-cfs which will be replicated to peer // cluster. - if ((peerConfig.getExcludeNamespaces() != null - && !peerConfig.getExcludeNamespaces().isEmpty()) - || (peerConfig.getExcludeTableCFsMap() != null - && !peerConfig.getExcludeTableCFsMap().isEmpty())) { + if ((peerConfig.getExcludeNamespaces() != null && + !peerConfig.getExcludeNamespaces().isEmpty()) || + (peerConfig.getExcludeTableCFsMap() != null && + !peerConfig.getExcludeTableCFsMap().isEmpty())) { throw new DoNotRetryIOException( - "Need clean exclude-namespaces or exclude-table-cfs config firstly" - + " when replicate_all flag is false"); + "Need clean exclude-namespaces or exclude-table-cfs config firstly" + + " when replicate_all flag is false"); } checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), peerConfig.getTableCFsMap()); @@ -338,11 +351,11 @@ public class ReplicationPeerManager { // TODO: Add namespace, replicat_all flag back if (peerConfig.replicateAllUserTables()) { throw new DoNotRetryIOException( - "Only support replicated table config for sync replication peer"); + "Only support replicated table config for sync replication peer"); } if (peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) { throw new DoNotRetryIOException( - "Only support replicated table config for sync replication peer"); + "Only support replicated table config for sync replication peer"); } if (peerConfig.getTableCFsMap() == null || peerConfig.getTableCFsMap().isEmpty()) { throw new DoNotRetryIOException("Need config replicated tables for sync replication peer"); @@ -350,7 +363,7 @@ public class ReplicationPeerManager { for (List<String> cfs : peerConfig.getTableCFsMap().values()) { if (cfs != null && !cfs.isEmpty()) { throw new DoNotRetryIOException( - "Only support replicated table config for sync replication peer"); + "Only support replicated table config for sync replication peer"); } } } @@ -394,7 +407,7 @@ public class ReplicationPeerManager { private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { String filterCSV = peerConfig.getConfiguration() - .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY); + .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY); if (filterCSV != null && !filterCSV.isEmpty()) { String[] filters = filterCSV.split(","); for (String filter : filters) { http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java index aad3b06..8fc932f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java @@ -18,11 +18,12 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; - +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; -import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; @@ -32,26 +33,29 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerSyncReplicationStateTransitionState; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.TransitPeerSyncReplicationStateStateData; /** - * The procedure for transit current cluster state for a synchronous replication peer. + * The procedure for transit current sync replication state for a synchronous replication peer. */ @InterfaceAudience.Private -public class TransitPeerSyncReplicationStateProcedure extends ModifyPeerProcedure { +public class TransitPeerSyncReplicationStateProcedure + extends AbstractPeerProcedure<PeerSyncReplicationStateTransitionState> { private static final Logger LOG = LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class); - private SyncReplicationState state; + private SyncReplicationState fromState; + + private SyncReplicationState toState; public TransitPeerSyncReplicationStateProcedure() { } public TransitPeerSyncReplicationStateProcedure(String peerId, SyncReplicationState state) { super(peerId); - this.state = state; + this.toState = state; } @Override @@ -60,99 +64,154 @@ public class TransitPeerSyncReplicationStateProcedure extends ModifyPeerProcedur } @Override - protected void prePeerModification(MasterProcedureEnv env) - throws IOException, ReplicationException { - MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); - if (cpHost != null) { - cpHost.preTransitReplicationPeerSyncReplicationState(peerId, state); + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + TransitPeerSyncReplicationStateStateData.Builder builder = + TransitPeerSyncReplicationStateStateData.newBuilder() + .setToState(ReplicationPeerConfigUtil.toSyncReplicationState(toState)); + if (fromState != null) { + builder.setFromState(ReplicationPeerConfigUtil.toSyncReplicationState(fromState)); } - env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, state); + serializer.serialize(builder.build()); } @Override - protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { - env.getReplicationPeerManager().transitPeerSyncReplicationState(peerId, state); + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + TransitPeerSyncReplicationStateStateData data = + serializer.deserialize(TransitPeerSyncReplicationStateStateData.class); + toState = ReplicationPeerConfigUtil.toSyncReplicationState(data.getToState()); + if (data.hasFromState()) { + fromState = ReplicationPeerConfigUtil.toSyncReplicationState(data.getFromState()); + } } @Override - protected void postPeerModification(MasterProcedureEnv env) - throws IOException, ReplicationException { - LOG.info("Successfully transit current cluster state to {} in synchronous replication peer {}", - state, peerId); + protected PeerSyncReplicationStateTransitionState getState(int stateId) { + return PeerSyncReplicationStateTransitionState.forNumber(stateId); + } + + @Override + protected int getStateId(PeerSyncReplicationStateTransitionState state) { + return state.getNumber(); + } + + @Override + protected PeerSyncReplicationStateTransitionState getInitialState() { + return PeerSyncReplicationStateTransitionState.PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION; + } + + private void preTransit(MasterProcedureEnv env) throws IOException { MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); if (cpHost != null) { - env.getMasterCoprocessorHost().postTransitReplicationPeerSyncReplicationState(peerId, state); + cpHost.preTransitReplicationPeerSyncReplicationState(peerId, toState); } + fromState = env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, toState); } - @Override - protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { - super.serializeStateData(serializer); - serializer.serialize(TransitPeerSyncReplicationStateStateData.newBuilder() - .setSyncReplicationState(ReplicationPeerConfigUtil.toSyncReplicationState(state)).build()); + private void postTransit(MasterProcedureEnv env) throws IOException { + LOG.info( + "Successfully transit current cluster state from {} to {} for sync replication peer {}", + fromState, toState, peerId); + MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + env.getMasterCoprocessorHost().postTransitReplicationPeerSyncReplicationState(peerId, + fromState, toState); + } } - @Override - protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { - super.deserializeStateData(serializer); - TransitPeerSyncReplicationStateStateData data = - serializer.deserialize(TransitPeerSyncReplicationStateStateData.class); - state = ReplicationPeerConfigUtil.toSyncReplicationState(data.getSyncReplicationState()); + private List<RegionInfo> getRegionsToReopen(MasterProcedureEnv env) { + return env.getReplicationPeerManager().getPeerConfig(peerId).get().getTableCFsMap().keySet() + .stream() + .flatMap(tn -> env.getAssignmentManager().getRegionStates().getRegionsOfTable(tn).stream()) + .collect(Collectors.toList()); } @Override - protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state) + protected Flow executeFromState(MasterProcedureEnv env, + PeerSyncReplicationStateTransitionState state) throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { switch (state) { - case PRE_PEER_MODIFICATION: + case PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION: try { - prePeerModification(env); + preTransit(env); } catch (IOException e) { - LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, " + - "mark the procedure as failure and give up", getClass().getName(), peerId, e); - setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e); - releaseLatch(); + LOG.warn("Failed to call pre CP hook or the pre check is failed for peer {} " + + "when transiting sync replication peer state to {}, " + + "mark the procedure as failure and give up", peerId, toState, e); + setFailure("master-transit-peer-sync-replication-state", e); return Flow.NO_MORE_STATE; - } catch (ReplicationException e) { - LOG.warn("{} failed to call prePeerModification for peer {}, retry", getClass().getName(), - peerId, e); - throw new ProcedureYieldException(); } - setNextState(PeerModificationState.UPDATE_PEER_STORAGE); + setNextState(PeerSyncReplicationStateTransitionState.SET_PEER_NEW_SYNC_REPLICATION_STATE); return Flow.HAS_MORE_STATE; - case UPDATE_PEER_STORAGE: + case SET_PEER_NEW_SYNC_REPLICATION_STATE: try { - updatePeerStorage(env); + env.getReplicationPeerManager().setPeerNewSyncReplicationState(peerId, toState); } catch (ReplicationException e) { - LOG.warn("{} update peer storage for peer {} failed, retry", getClass().getName(), peerId, - e); + LOG.warn("Failed to update peer storage for peer {} when starting transiting sync " + + "replication peer state from {} to {}, retry", peerId, fromState, toState, e); throw new ProcedureYieldException(); } - setNextState(PeerModificationState.REFRESH_PEER_ON_RS); + setNextState( + PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN); + return Flow.HAS_MORE_STATE; + case REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN: + addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream() + .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 0)) + .toArray(RefreshPeerProcedure[]::new)); + if (fromState == SyncReplicationState.STANDBY && + toState == SyncReplicationState.DOWNGRADE_ACTIVE) { + setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER); + } else { + setNextState(PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER); + } + return Flow.HAS_MORE_STATE; + case REPLAY_REMOTE_WAL_IN_PEER: + // TODO: replay remote wal when transiting from S to DA. + setNextState(PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER); return Flow.HAS_MORE_STATE; - case REFRESH_PEER_ON_RS: - // TODO: Need add child procedure for every RegionServer - setNextState(PeerModificationState.POST_PEER_MODIFICATION); + case REOPEN_ALL_REGIONS_IN_PEER: + try { + addChildProcedure( + env.getAssignmentManager().createReopenProcedures(getRegionsToReopen(env))); + } catch (IOException e) { + LOG.warn("Failed to schedule region reopen for peer {} when starting transiting sync " + + "replication peer state from {} to {}, retry", peerId, fromState, toState, e); + throw new ProcedureYieldException(); + } + setNextState( + PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE); return Flow.HAS_MORE_STATE; - case POST_PEER_MODIFICATION: + case TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE: try { - postPeerModification(env); + env.getReplicationPeerManager().transitPeerSyncReplicationState(peerId, toState); } catch (ReplicationException e) { - LOG.warn("{} failed to call postPeerModification for peer {}, retry", - getClass().getName(), peerId, e); + LOG.warn("Failed to update peer storage for peer {} when ending transiting sync " + + "replication peer state from {} to {}, retry", peerId, fromState, toState, e); throw new ProcedureYieldException(); + } + setNextState( + PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END); + return Flow.HAS_MORE_STATE; + case REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END: + addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream() + .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 1)) + .toArray(RefreshPeerProcedure[]::new)); + setNextState( + PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION); + case POST_PEER_SYNC_REPLICATION_STATE_TRANSITION: + try { + postTransit(env); } catch (IOException e) { - LOG.warn("{} failed to call post CP hook for peer {}, " + - "ignore since the procedure has already done", getClass().getName(), peerId, e); + LOG.warn( + "Failed to call post CP hook for peer {} when transiting sync replication " + + "peer state from {} to {}, ignore since the procedure has already done", + peerId, fromState, toState, e); } - releaseLatch(); return Flow.NO_MORE_STATE; default: throw new UnsupportedOperationException("unhandled state=" + state); } } - private void releaseLatch() { - ProcedurePrepareLatch.releaseLatch(latch, this); - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index a17b402..9c23750 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1801,21 +1801,27 @@ public class HRegionServer extends HasThread implements * be hooked up to WAL. */ private void setupWALAndReplication() throws IOException { + boolean isMasterNoTableOrSystemTableOnly = this instanceof HMaster && + (!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf)); + if (isMasterNoTableOrSystemTableOnly) { + conf.setBoolean(HConstants.SYNC_REPLICATION_ENABLED, false); + } WALFactory factory = new WALFactory(conf, serverName.toString()); + if (!isMasterNoTableOrSystemTableOnly) { + // TODO Replication make assumptions here based on the default filesystem impl + Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); + String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString()); - // TODO Replication make assumptions here based on the default filesystem impl - Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); - String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString()); - - Path logDir = new Path(walRootDir, logName); - LOG.debug("logDir={}", logDir); - if (this.walFs.exists(logDir)) { - throw new RegionServerRunningException( - "Region server has already created directory at " + this.serverName.toString()); + Path logDir = new Path(walRootDir, logName); + LOG.debug("logDir={}", logDir); + if (this.walFs.exists(logDir)) { + throw new RegionServerRunningException( + "Region server has already created directory at " + this.serverName.toString()); + } + // Instantiate replication if replication enabled. Pass it the log directories. + createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, + factory.getWALProvider()); } - // Instantiate replication if replication enabled. Pass it the log directories. - createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, - factory.getWALProvider()); this.walFactory = factory; } @@ -2937,11 +2943,6 @@ public class HRegionServer extends HasThread implements */ private static void createNewReplicationInstance(Configuration conf, HRegionServer server, FileSystem walFs, Path walDir, Path oldWALDir, WALProvider walProvider) throws IOException { - if ((server instanceof HMaster) && - (!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf))) { - return; - } - // read in the name of the source replication class from the config file. String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME, HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java index 23ba773..4529943 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java @@ -18,17 +18,22 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.replication.regionserver.PeerProcedureHandler; +import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider; import org.apache.yetus.audience.InterfaceAudience; /** - * A source for a replication stream has to expose this service. - * This service allows an application to hook into the - * regionserver and watch for new transactions. + * A source for a replication stream has to expose this service. This service allows an application + * to hook into the regionserver and watch for new transactions. */ @InterfaceAudience.Private public interface ReplicationSourceService extends ReplicationService { /** + * Returns an info provider for sync replication peer. + */ + SyncReplicationPeerInfoProvider getSyncReplicationPeerInfoProvider(); + + /** * Returns a Handler to handle peer procedures. */ PeerProcedureHandler getPeerProcedureHandler(); http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java index 6df2af9..efafd09 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java @@ -28,8 +28,8 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public interface PeerActionListener { - default void peerRemoved(String peerId) {} + static final PeerActionListener DUMMY = new PeerActionListener() {}; default void peerSyncReplicationStateChange(String peerId, SyncReplicationState from, - SyncReplicationState to) {} + SyncReplicationState to, int stage) {} } http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java index 65da9af..52b604b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java @@ -15,11 +15,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; - +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.yetus.audience.InterfaceAudience; @@ -29,13 +28,16 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public interface PeerProcedureHandler { - public void addPeer(String peerId) throws ReplicationException, IOException; + void addPeer(String peerId) throws ReplicationException, IOException; + + void removePeer(String peerId) throws ReplicationException, IOException; - public void removePeer(String peerId) throws ReplicationException, IOException; + void disablePeer(String peerId) throws ReplicationException, IOException; - public void disablePeer(String peerId) throws ReplicationException, IOException; + void enablePeer(String peerId) throws ReplicationException, IOException; - public void enablePeer(String peerId) throws ReplicationException, IOException; + void updatePeerConfig(String peerId) throws ReplicationException, IOException; - public void updatePeerConfig(String peerId) throws ReplicationException, IOException; + void transitSyncReplicationPeerState(String peerId, int stage, HRegionServer rs) + throws ReplicationException, IOException; } http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java index 78c1977..7fc9f53 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java @@ -19,23 +19,32 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.concurrent.locks.Lock; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.util.KeyLocker; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @InterfaceAudience.Private public class PeerProcedureHandlerImpl implements PeerProcedureHandler { + private static final Logger LOG = LoggerFactory.getLogger(PeerProcedureHandlerImpl.class); + private final ReplicationSourceManager replicationSourceManager; + private final PeerActionListener peerActionListener; private final KeyLocker<String> peersLock = new KeyLocker<>(); - public PeerProcedureHandlerImpl(ReplicationSourceManager replicationSourceManager) { + public PeerProcedureHandlerImpl(ReplicationSourceManager replicationSourceManager, + PeerActionListener peerActionListener) { this.replicationSourceManager = replicationSourceManager; + this.peerActionListener = peerActionListener; } @Override @@ -61,7 +70,6 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler { } private void refreshPeerState(String peerId) throws ReplicationException, IOException { - PeerState newState; Lock peerLock = peersLock.acquireLock(peerId); ReplicationPeerImpl peer = null; PeerState oldState = null; @@ -72,7 +80,7 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler { throw new ReplicationException("Peer with id=" + peerId + " is not cached."); } oldState = peer.getPeerState(); - newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId); + PeerState newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId); // RS need to start work with the new replication state change if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) { replicationSourceManager.refreshSources(peerId); @@ -132,4 +140,42 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler { peerLock.unlock(); } } + + @Override + public void transitSyncReplicationPeerState(String peerId, int stage, HRegionServer rs) + throws ReplicationException, IOException { + ReplicationPeers replicationPeers = replicationSourceManager.getReplicationPeers(); + Lock peerLock = peersLock.acquireLock(peerId); + try { + ReplicationPeerImpl peer = replicationPeers.getPeer(peerId); + if (peer == null) { + throw new ReplicationException("Peer with id=" + peerId + " is not cached."); + } + if (!peer.getPeerConfig().isSyncReplication()) { + throw new ReplicationException("Peer with id=" + peerId + " is not synchronous."); + } + SyncReplicationState newState = peer.getNewSyncReplicationState(); + if (stage == 0) { + if (newState != SyncReplicationState.NONE) { + LOG.warn("The new sync replication state for peer {} has already been set to {}, " + + "this should be a retry, give up", peerId, newState); + return; + } + newState = replicationPeers.refreshPeerNewSyncReplicationState(peerId); + SyncReplicationState oldState = peer.getSyncReplicationState(); + peerActionListener.peerSyncReplicationStateChange(peerId, oldState, newState, stage); + } else { + if (newState == SyncReplicationState.NONE) { + LOG.warn("The new sync replication state for peer {} has already been clear, and the " + + "current state is {}, this should be a retry, give up", peerId, newState); + return; + } + SyncReplicationState oldState = peer.getSyncReplicationState(); + peerActionListener.peerSyncReplicationStateChange(peerId, oldState, newState, stage); + peer.transitSyncReplicationState(); + } + } finally { + peerLock.unlock(); + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java index 7ada24b..8fe16bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java @@ -35,12 +35,15 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.R public class RefreshPeerCallable implements RSProcedureCallable { private static final Logger LOG = Logger.getLogger(RefreshPeerCallable.class); + private HRegionServer rs; private String peerId; private PeerModificationType type; + private int stage; + private Exception initError; @Override @@ -67,6 +70,9 @@ public class RefreshPeerCallable implements RSProcedureCallable { case UPDATE_PEER_CONFIG: handler.updatePeerConfig(this.peerId); break; + case TRANSIT_SYNC_REPLICATION_STATE: + handler.transitSyncReplicationPeerState(peerId, stage, rs); + break; default: throw new IllegalArgumentException("Unknown peer modification type: " + type); } @@ -80,6 +86,7 @@ public class RefreshPeerCallable implements RSProcedureCallable { RefreshPeerParameter param = RefreshPeerParameter.parseFrom(parameter); this.peerId = param.getPeerId(); this.type = param.getType(); + this.stage = param.getStage(); } catch (InvalidProtocolBufferException e) { initError = e; } http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 8290ac3..2846d2c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider; import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.yetus.audience.InterfaceAudience; @@ -66,6 +67,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer private ReplicationTracker replicationTracker; private Configuration conf; private ReplicationSink replicationSink; + private SyncReplicationPeerInfoProvider syncReplicationPeerInfoProvider; // Hosting server private Server server; /** Statistics thread schedule pool */ @@ -120,19 +122,30 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer } catch (KeeperException ke) { throw new IOException("Could not read cluster id", ke); } + SyncReplicationPeerMappingManager mapping = new SyncReplicationPeerMappingManager(); this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, - walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty()); + walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(), + mapping); + this.syncReplicationPeerInfoProvider = + new SyncReplicationPeerInfoProviderImpl(replicationPeers, mapping); + PeerActionListener peerActionListener = PeerActionListener.DUMMY; if (walProvider != null) { walProvider .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager)); + if (walProvider instanceof SyncReplicationWALProvider) { + SyncReplicationWALProvider syncWALProvider = (SyncReplicationWALProvider) walProvider; + peerActionListener = syncWALProvider; + syncWALProvider.setPeerInfoProvider(syncReplicationPeerInfoProvider); + } } this.statsThreadPeriod = this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); LOG.debug("Replication stats-in-log period={} seconds", this.statsThreadPeriod); this.replicationLoad = new ReplicationLoad(); - this.peerProcedureHandler = new PeerProcedureHandlerImpl(replicationManager); + this.peerProcedureHandler = + new PeerProcedureHandlerImpl(replicationManager, peerActionListener); } @Override @@ -270,4 +283,9 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics(); this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics); } + + @Override + public SyncReplicationPeerInfoProvider getSyncReplicationPeerInfoProvider() { + return syncReplicationPeerInfoProvider; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 9b4a22c..4212597 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationListener; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; @@ -135,6 +136,8 @@ public class ReplicationSourceManager implements ReplicationListener { // For recovered source, the queue id's format is peer_id-servername-* private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsByIdRecoveredQueues; + private final SyncReplicationPeerMappingManager syncReplicationPeerMappingManager; + private final Configuration conf; private final FileSystem fs; // The paths to the latest log of each wal group, for new coming peers @@ -169,9 +172,8 @@ public class ReplicationSourceManager implements ReplicationListener { public ReplicationSourceManager(ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf, Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId, - WALFileLengthProvider walFileLengthProvider) throws IOException { - // CopyOnWriteArrayList is thread-safe. - // Generally, reading is more than modifying. + WALFileLengthProvider walFileLengthProvider, + SyncReplicationPeerMappingManager syncReplicationPeerMappingManager) throws IOException { this.sources = new ConcurrentHashMap<>(); this.queueStorage = queueStorage; this.replicationPeers = replicationPeers; @@ -184,10 +186,11 @@ public class ReplicationSourceManager implements ReplicationListener { this.fs = fs; this.logDir = logDir; this.oldLogDir = oldLogDir; - this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000); // 30 - // seconds + // 30 seconds + this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000); this.clusterId = clusterId; this.walFileLengthProvider = walFileLengthProvider; + this.syncReplicationPeerMappingManager = syncReplicationPeerMappingManager; this.replicationTracker.registerListener(this); // It's preferable to failover 1 RS at a time, but with good zk servers // more could be processed at the same time. @@ -248,8 +251,11 @@ public class ReplicationSourceManager implements ReplicationListener { } /** - * 1. Add peer to replicationPeers 2. Add the normal source and related replication queue 3. Add - * HFile Refs + * <ol> + * <li>Add peer to replicationPeers</li> + * <li>Add the normal source and related replication queue</li> + * <li>Add HFile Refs</li> + * </ol> * @param peerId the id of replication peer */ public void addPeer(String peerId) throws IOException { @@ -268,13 +274,16 @@ public class ReplicationSourceManager implements ReplicationListener { } /** - * 1. Remove peer for replicationPeers 2. Remove all the recovered sources for the specified id - * and related replication queues 3. Remove the normal source and related replication queue 4. - * Remove HFile Refs + * <ol> + * <li>Remove peer for replicationPeers</li> + * <li>Remove all the recovered sources for the specified id and related replication queues</li> + * <li>Remove the normal source and related replication queue</li> + * <li>Remove HFile Refs</li> + * </ol> * @param peerId the id of the replication peer */ public void removePeer(String peerId) { - replicationPeers.removePeer(peerId); + ReplicationPeer peer = replicationPeers.removePeer(peerId); String terminateMessage = "Replication stream was removed by a user"; List<ReplicationSourceInterface> oldSourcesToDelete = new ArrayList<>(); // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer @@ -305,7 +314,10 @@ public class ReplicationSourceManager implements ReplicationListener { deleteQueue(peerId); this.walsById.remove(peerId); } - + ReplicationPeerConfig peerConfig = peer.getPeerConfig(); + if (peerConfig.isSyncReplication()) { + syncReplicationPeerMappingManager.remove(peerId, peerConfig); + } // Remove HFile Refs abortWhenFail(() -> this.queueStorage.removePeerFromHFileRefs(peerId)); } @@ -357,6 +369,10 @@ public class ReplicationSourceManager implements ReplicationListener { } } } + ReplicationPeerConfig peerConfig = peer.getPeerConfig(); + if (peerConfig.isSyncReplication()) { + syncReplicationPeerMappingManager.add(peer.getId(), peerConfig); + } src.startup(); return src; } @@ -434,6 +450,7 @@ public class ReplicationSourceManager implements ReplicationListener { // Delete queue from storage and memory deleteQueue(src.getQueueId()); this.walsById.remove(src.getQueueId()); + } /**