HBASE-19957 General framework to transit sync replication state

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/58b5849f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/58b5849f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/58b5849f

Branch: refs/heads/HBASE-19064
Commit: 58b5849feae6aa0103c8751707fd63a4a10009fd
Parents: e935a4c
Author: zhangduo <zhang...@apache.org>
Authored: Fri Feb 9 18:33:28 2018 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Fri May 25 10:11:48 2018 +0800

----------------------------------------------------------------------
 .../replication/ReplicationPeerConfig.java      |   2 -
 .../replication/ReplicationPeerDescription.java |   5 +-
 .../hbase/replication/SyncReplicationState.java |  19 +-
 .../org/apache/hadoop/hbase/HConstants.java     |   3 +
 .../src/main/protobuf/MasterProcedure.proto     |  20 +-
 .../hbase/replication/ReplicationPeerImpl.java  |  45 ++++-
 .../replication/ReplicationPeerStorage.java     |  25 ++-
 .../hbase/replication/ReplicationPeers.java     |  27 ++-
 .../replication/ZKReplicationPeerStorage.java   |  63 +++++--
 .../hbase/coprocessor/MasterObserver.java       |   7 +-
 .../org/apache/hadoop/hbase/master/HMaster.java |   4 +-
 .../hbase/master/MasterCoprocessorHost.java     |  12 +-
 .../replication/AbstractPeerProcedure.java      |  14 +-
 .../master/replication/ModifyPeerProcedure.java |  11 --
 .../replication/RefreshPeerProcedure.java       |  18 +-
 .../replication/ReplicationPeerManager.java     |  89 +++++----
 ...ransitPeerSyncReplicationStateProcedure.java | 181 ++++++++++++-------
 .../hbase/regionserver/HRegionServer.java       |  35 ++--
 .../regionserver/ReplicationSourceService.java  |  11 +-
 .../regionserver/PeerActionListener.java        |   4 +-
 .../regionserver/PeerProcedureHandler.java      |  16 +-
 .../regionserver/PeerProcedureHandlerImpl.java  |  52 +++++-
 .../regionserver/RefreshPeerCallable.java       |   7 +
 .../replication/regionserver/Replication.java   |  22 ++-
 .../regionserver/ReplicationSourceManager.java  |  41 +++--
 .../SyncReplicationPeerInfoProvider.java        |  43 +++++
 .../SyncReplicationPeerInfoProviderImpl.java    |  71 ++++++++
 .../SyncReplicationPeerMappingManager.java      |  48 +++++
 .../SyncReplicationPeerProvider.java            |  35 ----
 .../hbase/wal/SyncReplicationWALProvider.java   |  35 ++--
 .../org/apache/hadoop/hbase/wal/WALFactory.java |  47 ++---
 .../replication/TestReplicationAdmin.java       |   3 +-
 .../TestReplicationSourceManager.java           |   5 +-
 .../wal/TestSyncReplicationWALProvider.java     |  36 ++--
 34 files changed, 743 insertions(+), 313 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
index 997a155..cc7b4bc 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.replication;
 
 import java.util.Collection;
@@ -25,7 +24,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.util.Bytes;

http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerDescription.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerDescription.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerDescription.java
index 2d077c5..b0c27bb 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerDescription.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerDescription.java
@@ -20,7 +20,10 @@ package org.apache.hadoop.hbase.replication;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
- * The POJO equivalent of ReplicationProtos.ReplicationPeerDescription
+ * The POJO equivalent of ReplicationProtos.ReplicationPeerDescription.
+ * <p>
+ * To developer, here we do not store the new sync replication state since it 
is just an
+ * intermediate state and this class is public.
  */
 @InterfaceAudience.Public
 public class ReplicationPeerDescription {

http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java
index a65b144..de9576c 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java
@@ -29,14 +29,19 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
 /**
  * Used by synchronous replication. Indicate the state of the current cluster 
in a synchronous
  * replication peer. The state may be one of {@link 
SyncReplicationState#ACTIVE},
- * {@link SyncReplicationState#DOWNGRADE_ACTIVE} or
- * {@link SyncReplicationState#STANDBY}.
+ * {@link SyncReplicationState#DOWNGRADE_ACTIVE} or {@link 
SyncReplicationState#STANDBY}.
  * <p>
  * For asynchronous replication, the state is {@link 
SyncReplicationState#NONE}.
  */
 @InterfaceAudience.Public
 public enum SyncReplicationState {
-  NONE, ACTIVE, DOWNGRADE_ACTIVE, STANDBY;
+  NONE(0), ACTIVE(1), DOWNGRADE_ACTIVE(2), STANDBY(3);
+
+  private final byte value;
+
+  private SyncReplicationState(int value) {
+    this.value = (byte) value;
+  }
 
   public static SyncReplicationState valueOf(int value) {
     switch (value) {
@@ -53,13 +58,17 @@ public enum SyncReplicationState {
     }
   }
 
+  public int value() {
+    return value & 0xFF;
+  }
+
   public static byte[] toByteArray(SyncReplicationState state) {
     return ProtobufUtil
-        
.prependPBMagic(ReplicationPeerConfigUtil.toSyncReplicationState(state).toByteArray());
+      
.prependPBMagic(ReplicationPeerConfigUtil.toSyncReplicationState(state).toByteArray());
   }
 
   public static SyncReplicationState parseFrom(byte[] bytes) throws 
InvalidProtocolBufferException {
     return 
ReplicationPeerConfigUtil.toSyncReplicationState(ReplicationProtos.SyncReplicationState
-        .parseFrom(Arrays.copyOfRange(bytes, ProtobufUtil.lengthOfPBMagic(), 
bytes.length)));
+      .parseFrom(Arrays.copyOfRange(bytes, ProtobufUtil.lengthOfPBMagic(), 
bytes.length)));
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 9241682..522c2cf 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1355,6 +1355,9 @@ public final class HConstants {
 
   public static final String NOT_IMPLEMENTED = "Not implemented";
 
+  // TODO: need to find a better place to hold it.
+  public static final String SYNC_REPLICATION_ENABLED = 
"hbase.replication.sync.enabled";
+
   private HConstants() {
     // Can't be instantiated with this ctor.
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto 
b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
index 39fc72a..67c1b43 100644
--- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
@@ -389,6 +389,17 @@ enum PeerModificationState {
   POST_PEER_MODIFICATION = 8;
 }
 
+enum PeerSyncReplicationStateTransitionState {
+  PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION = 1;
+  SET_PEER_NEW_SYNC_REPLICATION_STATE = 2;
+  REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN = 3;
+  REPLAY_REMOTE_WAL_IN_PEER = 4;
+  REOPEN_ALL_REGIONS_IN_PEER = 5;
+  TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE = 6;
+  REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END = 7;
+  POST_PEER_SYNC_REPLICATION_STATE_TRANSITION = 8;
+}
+
 message PeerModificationStateData {
   required string peer_id = 1;
 }
@@ -399,18 +410,23 @@ enum PeerModificationType {
   ENABLE_PEER = 3;
   DISABLE_PEER = 4;
   UPDATE_PEER_CONFIG = 5;
+  TRANSIT_SYNC_REPLICATION_STATE = 6;
 }
 
 message RefreshPeerStateData {
   required string peer_id = 1;
   required PeerModificationType type = 2;
   required ServerName target_server = 3;
+    /** We need multiple stages for sync replication state transition **/
+  optional uint32 stage = 4 [default = 0];
 }
 
 message RefreshPeerParameter {
   required string peer_id = 1;
   required PeerModificationType type = 2;
   required ServerName target_server = 3;
+  /** We need multiple stages for sync replication state transition **/
+  optional uint32 stage = 4 [default = 0];;
 }
 
 message PeerProcedureStateData {
@@ -438,5 +454,7 @@ message DisablePeerStateData {
 }
 
 message TransitPeerSyncReplicationStateStateData {
-  required SyncReplicationState syncReplicationState = 1;
+  /** Could be null if we fail in pre check, so optional */
+  optional SyncReplicationState fromState = 1;
+  required SyncReplicationState toState = 2;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
index ff3f662..22026e5 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.yetus.audience.InterfaceAudience;
 
 @InterfaceAudience.Private
@@ -36,7 +37,14 @@ public class ReplicationPeerImpl implements ReplicationPeer {
 
   private volatile PeerState peerState;
 
-  private volatile SyncReplicationState syncReplicationState;
+  // The lower 16 bits are the current sync replication state, the higher 16 
bits are the new sync
+  // replication state. Embedded in one int so user can not get an 
inconsistency view of state and
+  // new state.
+  private volatile int syncReplicationStateBits;
+
+  private static final int SHIFT = 16;
+
+  private static final int AND_BITS = 0xFFFF;
 
   private final List<ReplicationPeerConfigListener> peerConfigListeners;
 
@@ -48,12 +56,14 @@ public class ReplicationPeerImpl implements ReplicationPeer 
{
    * @param peerConfig configuration for the replication peer
    */
   public ReplicationPeerImpl(Configuration conf, String id, 
ReplicationPeerConfig peerConfig,
-      boolean peerState, SyncReplicationState syncReplicationState) {
+      boolean peerState, SyncReplicationState syncReplicationState,
+      SyncReplicationState newSyncReplicationState) {
     this.conf = conf;
     this.id = id;
     this.peerState = peerState ? PeerState.ENABLED : PeerState.DISABLED;
     this.peerConfig = peerConfig;
-    this.syncReplicationState = syncReplicationState;
+    this.syncReplicationStateBits =
+      syncReplicationState.value() | (newSyncReplicationState.value() << 
SHIFT);
     this.peerConfigListeners = new ArrayList<>();
   }
 
@@ -66,6 +76,16 @@ public class ReplicationPeerImpl implements ReplicationPeer {
     peerConfigListeners.forEach(listener -> 
listener.peerConfigUpdated(peerConfig));
   }
 
+  public void setNewSyncReplicationState(SyncReplicationState newState) {
+    this.syncReplicationStateBits =
+      (this.syncReplicationStateBits & AND_BITS) | (newState.value() << SHIFT);
+  }
+
+  public void transitSyncReplicationState() {
+    this.syncReplicationStateBits =
+      (this.syncReplicationStateBits >>> SHIFT) | 
(SyncReplicationState.NONE.value() << SHIFT);
+  }
+
   /**
    * Get the identifier of this peer
    * @return string representation of the id (short)
@@ -80,9 +100,26 @@ public class ReplicationPeerImpl implements ReplicationPeer 
{
     return peerState;
   }
 
+  private static SyncReplicationState getSyncReplicationState(int bits) {
+    return SyncReplicationState.valueOf(bits & AND_BITS);
+  }
+
+  private static SyncReplicationState getNewSyncReplicationState(int bits) {
+    return SyncReplicationState.valueOf(bits >>> SHIFT);
+  }
+
+  public Pair<SyncReplicationState, SyncReplicationState> 
getSyncReplicationStateAndNewState() {
+    int bits = this.syncReplicationStateBits;
+    return Pair.newPair(getSyncReplicationState(bits), 
getNewSyncReplicationState(bits));
+  }
+
+  public SyncReplicationState getNewSyncReplicationState() {
+    return getNewSyncReplicationState(syncReplicationStateBits);
+  }
+
   @Override
   public SyncReplicationState getSyncReplicationState() {
-    return syncReplicationState;
+    return getSyncReplicationState(syncReplicationStateBits);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java
index d2538ab..f74ac37 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hbase.replication;
 
 import java.util.List;
-
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -72,16 +71,30 @@ public interface ReplicationPeerStorage {
   ReplicationPeerConfig getPeerConfig(String peerId) throws 
ReplicationException;
 
   /**
-   * Set the state of current cluster in a synchronous replication peer.
+   * Set the new sync replication state that we are going to transit to.
    * @throws ReplicationException if there are errors accessing the storage 
service.
    */
-  void setPeerSyncReplicationState(String peerId, SyncReplicationState state)
+  void setPeerNewSyncReplicationState(String peerId, SyncReplicationState 
state)
       throws ReplicationException;
 
   /**
-   * Get the state of current cluster in a synchronous replication peer.
+   * Overwrite the sync replication state with the new sync replication state 
which is set with the
+   * {@link #setPeerNewSyncReplicationState(String, SyncReplicationState)} 
method above, and clear
+   * the new sync replication state.
    * @throws ReplicationException if there are errors accessing the storage 
service.
    */
-  SyncReplicationState getPeerSyncReplicationState(String peerId)
-      throws ReplicationException;
+  void transitPeerSyncReplicationState(String peerId) throws 
ReplicationException;
+
+  /**
+   * Get the sync replication state.
+   * @throws ReplicationException if there are errors accessing the storage 
service.
+   */
+  SyncReplicationState getPeerSyncReplicationState(String peerId) throws 
ReplicationException;
+
+  /**
+   * Get the new sync replication state. Will return {@link 
SyncReplicationState#NONE} if we are
+   * not in a transition.
+   * @throws ReplicationException if there are errors accessing the storage 
service.
+   */
+  SyncReplicationState getPeerNewSyncReplicationState(String peerId) throws 
ReplicationException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
index a54f339..ba6da7a 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -80,8 +80,8 @@ public class ReplicationPeers {
     return true;
   }
 
-  public void removePeer(String peerId) {
-    peerCache.remove(peerId);
+  public ReplicationPeerImpl removePeer(String peerId) {
+    return peerCache.remove(peerId);
   }
 
   /**
@@ -110,22 +110,29 @@ public class ReplicationPeers {
 
   public PeerState refreshPeerState(String peerId) throws ReplicationException 
{
     ReplicationPeerImpl peer = peerCache.get(peerId);
-    if (peer == null) {
-      throw new ReplicationException("Peer with id=" + peerId + " is not 
cached.");
-    }
     peer.setPeerState(peerStorage.isPeerEnabled(peerId));
     return peer.getPeerState();
   }
 
   public ReplicationPeerConfig refreshPeerConfig(String peerId) throws 
ReplicationException {
     ReplicationPeerImpl peer = peerCache.get(peerId);
-    if (peer == null) {
-      throw new ReplicationException("Peer with id=" + peerId + " is not 
cached.");
-    }
     peer.setPeerConfig(peerStorage.getPeerConfig(peerId));
     return peer.getPeerConfig();
   }
 
+  public SyncReplicationState refreshPeerNewSyncReplicationState(String peerId)
+      throws ReplicationException {
+    ReplicationPeerImpl peer = peerCache.get(peerId);
+    SyncReplicationState newState = 
peerStorage.getPeerNewSyncReplicationState(peerId);
+    peer.setNewSyncReplicationState(newState);
+    return newState;
+  }
+
+  public void transitPeerSyncReplicationState(String peerId) {
+    ReplicationPeerImpl peer = peerCache.get(peerId);
+    peer.transitSyncReplicationState();
+  }
+
   /**
    * Helper method to connect to a peer
    * @param peerId peer's identifier
@@ -135,7 +142,9 @@ public class ReplicationPeers {
     ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
     boolean enabled = peerStorage.isPeerEnabled(peerId);
     SyncReplicationState syncReplicationState = 
peerStorage.getPeerSyncReplicationState(peerId);
+    SyncReplicationState newSyncReplicationState =
+      peerStorage.getPeerNewSyncReplicationState(peerId);
     return new 
ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, 
conf),
-        peerId, peerConfig, enabled, syncReplicationState);
+      peerId, peerConfig, enabled, syncReplicationState, 
newSyncReplicationState);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
index 9107cf6..a2cdfdf 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
@@ -53,7 +53,12 @@ public class ZKReplicationPeerStorage extends 
ZKReplicationStorageBase
   public static final byte[] DISABLED_ZNODE_BYTES =
     toByteArray(ReplicationProtos.ReplicationState.State.DISABLED);
 
-  public static final String SYNCHRONOUS_REPLICATION_STATE_ZNODE = 
"sync-rep-state";
+  public static final String SYNC_REPLICATION_STATE_ZNODE = "sync-rep-state";
+
+  public static final String NEW_SYNC_REPLICATION_STATE_ZNODE = 
"new-sync-rep-state";
+
+  public static final byte[] NONE_STATE_ZNODE_BYTES =
+    SyncReplicationState.toByteArray(SyncReplicationState.NONE);
 
   /**
    * The name of the znode that contains the replication status of a remote 
slave (i.e. peer)
@@ -85,7 +90,11 @@ public class ZKReplicationPeerStorage extends 
ZKReplicationStorageBase
 
   @VisibleForTesting
   public String getSyncReplicationStateNode(String peerId) {
-    return ZNodePaths.joinZNode(getPeerNode(peerId), 
SYNCHRONOUS_REPLICATION_STATE_ZNODE);
+    return ZNodePaths.joinZNode(getPeerNode(peerId), 
SYNC_REPLICATION_STATE_ZNODE);
+  }
+
+  private String getNewSyncReplicationStateNode(String peerId) {
+    return ZNodePaths.joinZNode(getPeerNode(peerId), 
NEW_SYNC_REPLICATION_STATE_ZNODE);
   }
 
   @Override
@@ -97,14 +106,15 @@ public class ZKReplicationPeerStorage extends 
ZKReplicationStorageBase
       ZKUtilOp.createAndFailSilent(getPeerStateNode(peerId),
         enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES),
       ZKUtilOp.createAndFailSilent(getSyncReplicationStateNode(peerId),
-        SyncReplicationState.toByteArray(syncReplicationState)));
+        SyncReplicationState.toByteArray(syncReplicationState)),
+      ZKUtilOp.createAndFailSilent(getNewSyncReplicationStateNode(peerId), 
NONE_STATE_ZNODE_BYTES));
     try {
       ZKUtil.createWithParents(zookeeper, peersZNode);
       ZKUtil.multiOrSequential(zookeeper, multiOps, false);
     } catch (KeeperException e) {
       throw new ReplicationException(
         "Could not add peer with id=" + peerId + ", peerConfig=>" + peerConfig 
+ ", state=" +
-            (enabled ? "ENABLED" : "DISABLED") + ", syncReplicationState=" + 
syncReplicationState,
+          (enabled ? "ENABLED" : "DISABLED") + ", syncReplicationState=" + 
syncReplicationState,
         e);
     }
   }
@@ -136,7 +146,7 @@ public class ZKReplicationPeerStorage extends 
ZKReplicationStorageBase
         ReplicationPeerConfigUtil.toByteArray(peerConfig));
     } catch (KeeperException e) {
       throw new ReplicationException(
-          "There was a problem trying to save changes to the " + "replication 
peer " + peerId, e);
+        "There was a problem trying to save changes to the " + "replication 
peer " + peerId, e);
     }
   }
 
@@ -170,38 +180,63 @@ public class ZKReplicationPeerStorage extends 
ZKReplicationStorageBase
     }
     if (data == null || data.length == 0) {
       throw new ReplicationException(
-          "Replication peer config data shouldn't be empty, peerId=" + peerId);
+        "Replication peer config data shouldn't be empty, peerId=" + peerId);
     }
     try {
       return ReplicationPeerConfigUtil.parsePeerFrom(data);
     } catch (DeserializationException e) {
       throw new ReplicationException(
-          "Failed to parse replication peer config for peer with id=" + 
peerId, e);
+        "Failed to parse replication peer config for peer with id=" + peerId, 
e);
     }
   }
 
   @Override
-  public void setPeerSyncReplicationState(String peerId, SyncReplicationState 
state)
+  public void setPeerNewSyncReplicationState(String peerId, 
SyncReplicationState state)
       throws ReplicationException {
     try {
-      ZKUtil.setData(zookeeper, getSyncReplicationStateNode(peerId),
+      ZKUtil.createSetData(zookeeper, getNewSyncReplicationStateNode(peerId),
         SyncReplicationState.toByteArray(state));
     } catch (KeeperException e) {
       throw new ReplicationException(
-        "Unable to change the cluster state for the synchronous replication 
peer with id=" + peerId,
-        e);
+        "Unable to set the new sync replication state for peer with id=" + 
peerId, e);
     }
   }
 
   @Override
-  public SyncReplicationState getPeerSyncReplicationState(String peerId)
+  public void transitPeerSyncReplicationState(String peerId) throws 
ReplicationException {
+    String newStateNode = getNewSyncReplicationStateNode(peerId);
+    try {
+      byte[] data = ZKUtil.getData(zookeeper, newStateNode);
+      ZKUtil.multiOrSequential(zookeeper,
+        Arrays.asList(ZKUtilOp.setData(newStateNode, NONE_STATE_ZNODE_BYTES),
+          ZKUtilOp.setData(getSyncReplicationStateNode(peerId), data)),
+        false);
+    } catch (KeeperException | InterruptedException e) {
+      throw new ReplicationException(
+        "Error transiting sync replication state for peer with id=" + peerId, 
e);
+    }
+  }
+
+  private SyncReplicationState getSyncReplicationState(String peerId, String 
path)
       throws ReplicationException {
     try {
-      byte[] data = ZKUtil.getData(zookeeper, 
getSyncReplicationStateNode(peerId));
+      byte[] data = ZKUtil.getData(zookeeper, path);
       return SyncReplicationState.parseFrom(data);
     } catch (KeeperException | InterruptedException | IOException e) {
       throw new ReplicationException(
-        "Error getting cluster state for the synchronous replication peer with 
id=" + peerId, e);
+        "Error getting sync replication state of path " + path + " for peer 
with id=" + peerId, e);
     }
   }
+
+  @Override
+  public SyncReplicationState getPeerNewSyncReplicationState(String peerId)
+      throws ReplicationException {
+    return getSyncReplicationState(peerId, 
getNewSyncReplicationStateNode(peerId));
+  }
+
+  @Override
+  public SyncReplicationState getPeerSyncReplicationState(String peerId)
+      throws ReplicationException {
+    return getSyncReplicationState(peerId, 
getSyncReplicationStateNode(peerId));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
index 1fc0f37..a0aba3b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
@@ -1334,7 +1334,7 @@ public interface MasterObserver {
    * Called before transit current cluster state for the specified synchronous 
replication peer
    * @param ctx the environment to interact with the framework and master
    * @param peerId a short name that identifies the peer
-   * @param state a new state
+   * @param state the new state
    */
   default void preTransitReplicationPeerSyncReplicationState(
       final ObserverContext<MasterCoprocessorEnvironment> ctx, String peerId,
@@ -1345,11 +1345,12 @@ public interface MasterObserver {
    * Called after transit current cluster state for the specified synchronous 
replication peer
    * @param ctx the environment to interact with the framework and master
    * @param peerId a short name that identifies the peer
-   * @param state a new state
+   * @param from the old state
+   * @param to the new state
    */
   default void postTransitReplicationPeerSyncReplicationState(
       final ObserverContext<MasterCoprocessorEnvironment> ctx, String peerId,
-      SyncReplicationState state) throws IOException {
+      SyncReplicationState from, SyncReplicationState to) throws IOException {
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 8129f9e..e7e585d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -130,10 +130,10 @@ import 
org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
 import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
 import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure;
 import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
+import org.apache.hadoop.hbase.master.replication.AbstractPeerProcedure;
 import org.apache.hadoop.hbase.master.replication.AddPeerProcedure;
 import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure;
 import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure;
-import org.apache.hadoop.hbase.master.replication.ModifyPeerProcedure;
 import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
 import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
 import 
org.apache.hadoop.hbase.master.replication.TransitPeerSyncReplicationStateProcedure;
@@ -3421,7 +3421,7 @@ public class HMaster extends HRegionServer implements 
MasterServices {
     return favoredNodesManager;
   }
 
-  private long executePeerProcedure(ModifyPeerProcedure procedure) throws 
IOException {
+  private long executePeerProcedure(AbstractPeerProcedure<?> procedure) throws 
IOException {
     long procId = procedureExecutor.submitProcedure(procedure);
     procedure.getLatch().await();
     return procId;

http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
index af57096..56ae2c0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
@@ -1535,22 +1535,22 @@ public class MasterCoprocessorHost
     });
   }
 
-  public void preTransitReplicationPeerSyncReplicationState(final String 
peerId,
-      final SyncReplicationState clusterState) throws IOException {
+  public void preTransitReplicationPeerSyncReplicationState(String peerId,
+      SyncReplicationState state) throws IOException {
     execOperation(coprocEnvironments.isEmpty() ? null : new 
MasterObserverOperation() {
       @Override
       public void call(MasterObserver observer) throws IOException {
-        observer.preTransitReplicationPeerSyncReplicationState(this, peerId, 
clusterState);
+        observer.preTransitReplicationPeerSyncReplicationState(this, peerId, 
state);
       }
     });
   }
 
-  public void postTransitReplicationPeerSyncReplicationState(final String 
peerId,
-      final SyncReplicationState clusterState) throws IOException {
+  public void postTransitReplicationPeerSyncReplicationState(String peerId,
+      SyncReplicationState from, SyncReplicationState to) throws IOException {
     execOperation(coprocEnvironments.isEmpty() ? null : new 
MasterObserverOperation() {
       @Override
       public void call(MasterObserver observer) throws IOException {
-        observer.postTransitReplicationPeerSyncReplicationState(this, peerId, 
clusterState);
+        observer.postTransitReplicationPeerSyncReplicationState(this, peerId, 
from, to);
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
index 0ad8a63..6679d78 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
@@ -46,7 +46,7 @@ public abstract class AbstractPeerProcedure<TState>
 
   protected AbstractPeerProcedure(String peerId) {
     this.peerId = peerId;
-    this.latch = ProcedurePrepareLatch.createLatch(2, 0);
+    this.latch = ProcedurePrepareLatch.createLatch(2, 1);
   }
 
   public ProcedurePrepareLatch getLatch() {
@@ -94,4 +94,16 @@ public abstract class AbstractPeerProcedure<TState>
     super.deserializeStateData(serializer);
     peerId = serializer.deserialize(PeerProcedureStateData.class).getPeerId();
   }
+
+  @Override
+  protected void rollbackState(MasterProcedureEnv env, TState state)
+      throws IOException, InterruptedException {
+    if (state == getInitialState()) {
+      // actually the peer related operations has no rollback, but if we 
haven't done any
+      // modifications on the peer storage yet, we can just return.
+      return;
+    }
+    throw new UnsupportedOperationException();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
index ea2e314..32b8ea1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
@@ -328,17 +328,6 @@ public abstract class ModifyPeerProcedure extends 
AbstractPeerProcedure<PeerModi
   }
 
   @Override
-  protected void rollbackState(MasterProcedureEnv env, PeerModificationState 
state)
-      throws IOException, InterruptedException {
-    if (state == PeerModificationState.PRE_PEER_MODIFICATION) {
-      // actually the peer related operations has no rollback, but if we 
haven't done any
-      // modifications on the peer storage yet, we can just return.
-      return;
-    }
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
   protected PeerModificationState getState(int stateId) {
     return PeerModificationState.forNumber(stateId);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java
index ba9bcdc..d51ea63 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java
@@ -54,6 +54,8 @@ public class RefreshPeerProcedure extends 
Procedure<MasterProcedureEnv>
       justification = "Will never change after construction")
   private ServerName targetServer;
 
+  private int stage;
+
   private boolean dispatched;
 
   private ProcedureEvent<?> event;
@@ -64,9 +66,15 @@ public class RefreshPeerProcedure extends 
Procedure<MasterProcedureEnv>
   }
 
   public RefreshPeerProcedure(String peerId, PeerOperationType type, 
ServerName targetServer) {
+    this(peerId, type, targetServer, 0);
+  }
+
+  public RefreshPeerProcedure(String peerId, PeerOperationType type, 
ServerName targetServer,
+      int stage) {
     this.peerId = peerId;
     this.type = type;
     this.targetServer = targetServer;
+    this.stage = stage;
   }
 
   @Override
@@ -91,6 +99,8 @@ public class RefreshPeerProcedure extends 
Procedure<MasterProcedureEnv>
         return PeerModificationType.DISABLE_PEER;
       case UPDATE_CONFIG:
         return PeerModificationType.UPDATE_PEER_CONFIG;
+      case TRANSIT_SYNC_REPLICATION_STATE:
+        return PeerModificationType.TRANSIT_SYNC_REPLICATION_STATE;
       default:
         throw new IllegalArgumentException("Unknown type: " + type);
     }
@@ -108,6 +118,8 @@ public class RefreshPeerProcedure extends 
Procedure<MasterProcedureEnv>
         return PeerOperationType.DISABLE;
       case UPDATE_PEER_CONFIG:
         return PeerOperationType.UPDATE_CONFIG;
+      case TRANSIT_SYNC_REPLICATION_STATE:
+        return PeerOperationType.TRANSIT_SYNC_REPLICATION_STATE;
       default:
         throw new IllegalArgumentException("Unknown type: " + type);
     }
@@ -118,7 +130,8 @@ public class RefreshPeerProcedure extends 
Procedure<MasterProcedureEnv>
     assert targetServer.equals(remote);
     return new ServerOperation(this, getProcId(), RefreshPeerCallable.class,
         
RefreshPeerParameter.newBuilder().setPeerId(peerId).setType(toPeerModificationType(type))
-            
.setTargetServer(ProtobufUtil.toServerName(remote)).build().toByteArray());
+            
.setTargetServer(ProtobufUtil.toServerName(remote)).setStage(stage).build()
+            .toByteArray());
   }
 
   private void complete(MasterProcedureEnv env, Throwable error) {
@@ -193,7 +206,7 @@ public class RefreshPeerProcedure extends 
Procedure<MasterProcedureEnv>
   protected void serializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
     serializer.serialize(
       
RefreshPeerStateData.newBuilder().setPeerId(peerId).setType(toPeerModificationType(type))
-          .setTargetServer(ProtobufUtil.toServerName(targetServer)).build());
+          
.setTargetServer(ProtobufUtil.toServerName(targetServer)).setStage(stage).build());
   }
 
   @Override
@@ -202,5 +215,6 @@ public class RefreshPeerProcedure extends 
Procedure<MasterProcedureEnv>
     peerId = data.getPeerId();
     type = toPeerOperationType(data.getType());
     targetServer = ProtobufUtil.toServerName(data.getTargetServer());
+    stage = data.getStage();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index ff778a8..0dc922d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master.replication;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.EnumMap;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
@@ -50,6 +49,9 @@ import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
+
 /**
  * Manages and performs all replication admin operations.
  * <p>
@@ -64,15 +66,11 @@ public class ReplicationPeerManager {
 
   private final ConcurrentMap<String, ReplicationPeerDescription> peers;
 
-  private final EnumMap<SyncReplicationState, EnumSet<SyncReplicationState>> 
allowedTransition =
-    new EnumMap<SyncReplicationState, 
EnumSet<SyncReplicationState>>(SyncReplicationState.class) {
-      {
-        put(SyncReplicationState.ACTIVE, 
EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE));
-        put(SyncReplicationState.STANDBY, 
EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE));
-        put(SyncReplicationState.DOWNGRADE_ACTIVE,
-          EnumSet.of(SyncReplicationState.STANDBY, 
SyncReplicationState.ACTIVE));
-      }
-    };
+  private final ImmutableMap<SyncReplicationState, 
EnumSet<SyncReplicationState>>
+    allowedTransition = 
Maps.immutableEnumMap(ImmutableMap.of(SyncReplicationState.ACTIVE,
+      EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE), 
SyncReplicationState.STANDBY,
+      EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE), 
SyncReplicationState.DOWNGRADE_ACTIVE,
+      EnumSet.of(SyncReplicationState.STANDBY, SyncReplicationState.ACTIVE)));
 
   ReplicationPeerManager(ReplicationPeerStorage peerStorage, 
ReplicationQueueStorage queueStorage,
       ConcurrentMap<String, ReplicationPeerDescription> peers) {
@@ -165,9 +163,9 @@ public class ReplicationPeerManager {
 
     if (!isStringEquals(peerConfig.getRemoteWALDir(), 
oldPeerConfig.getRemoteWALDir())) {
       throw new DoNotRetryIOException(
-          "Changing the remote wal dir on an existing peer is not allowed. 
Existing remote wal " +
-              "dir '" + oldPeerConfig.getRemoteWALDir() + "' for peer " + 
peerId +
-              " does not match new remote wal dir '" + 
peerConfig.getRemoteWALDir() + "'");
+        "Changing the remote wal dir on an existing peer is not allowed. 
Existing remote wal " +
+          "dir '" + oldPeerConfig.getRemoteWALDir() + "' for peer " + peerId +
+          " does not match new remote wal dir '" + 
peerConfig.getRemoteWALDir() + "'");
     }
 
     if (oldPeerConfig.isSyncReplication()) {
@@ -180,15 +178,19 @@ public class ReplicationPeerManager {
     return desc;
   }
 
-  public void preTransitPeerSyncReplicationState(String peerId, 
SyncReplicationState state)
-      throws DoNotRetryIOException {
+  /**
+   * @return the old state.
+   */
+  public SyncReplicationState preTransitPeerSyncReplicationState(String peerId,
+      SyncReplicationState state) throws DoNotRetryIOException {
     ReplicationPeerDescription desc = checkPeerExists(peerId);
     SyncReplicationState fromState = desc.getSyncReplicationState();
     EnumSet<SyncReplicationState> allowedToStates = 
allowedTransition.get(fromState);
     if (allowedToStates == null || !allowedToStates.contains(state)) {
       throw new DoNotRetryIOException("Can not transit current cluster state 
from " + fromState +
-          " to " + state + " for peer id=" + peerId);
+        " to " + state + " for peer id=" + peerId);
     }
+    return fromState;
   }
 
   public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean 
enabled)
@@ -199,8 +201,8 @@ public class ReplicationPeerManager {
     }
     ReplicationPeerConfig copiedPeerConfig = 
ReplicationPeerConfig.newBuilder(peerConfig).build();
     SyncReplicationState syncReplicationState =
-        copiedPeerConfig.isSyncReplication() ? 
SyncReplicationState.DOWNGRADE_ACTIVE
-            : SyncReplicationState.NONE;
+      copiedPeerConfig.isSyncReplication() ? 
SyncReplicationState.DOWNGRADE_ACTIVE
+        : SyncReplicationState.NONE;
     peerStorage.addPeer(peerId, copiedPeerConfig, enabled, 
syncReplicationState);
     peers.put(peerId,
       new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig, 
syncReplicationState));
@@ -240,7 +242,7 @@ public class ReplicationPeerManager {
     ReplicationPeerDescription desc = peers.get(peerId);
     ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
     ReplicationPeerConfigBuilder newPeerConfigBuilder =
-        ReplicationPeerConfig.newBuilder(peerConfig);
+      ReplicationPeerConfig.newBuilder(peerConfig);
     // we need to use the new conf to overwrite the old one.
     newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration());
     newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration());
@@ -257,7 +259,7 @@ public class ReplicationPeerManager {
       return new ArrayList<>(peers.values());
     }
     return peers.values().stream().filter(r -> 
pattern.matcher(r.getPeerId()).matches())
-        .collect(Collectors.toList());
+      .collect(Collectors.toList());
   }
 
   public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) {
@@ -269,12 +271,23 @@ public class ReplicationPeerManager {
     queueStorage.removeLastSequenceIds(peerId);
   }
 
-  public void transitPeerSyncReplicationState(String peerId, 
SyncReplicationState state)
+  public void setPeerNewSyncReplicationState(String peerId, 
SyncReplicationState state)
       throws ReplicationException {
+    peerStorage.setPeerNewSyncReplicationState(peerId, state);
+  }
+
+  public void transitPeerSyncReplicationState(String peerId, 
SyncReplicationState newState)
+      throws ReplicationException {
+    if (peerStorage.getPeerNewSyncReplicationState(peerId) != 
SyncReplicationState.NONE) {
+      // Only transit if this is not a retry
+      peerStorage.transitPeerSyncReplicationState(peerId);
+    }
     ReplicationPeerDescription desc = peers.get(peerId);
-    peerStorage.setPeerSyncReplicationState(peerId, state);
-    peers.put(peerId,
-      new ReplicationPeerDescription(peerId, desc.isEnabled(), 
desc.getPeerConfig(), state));
+    if (desc.getSyncReplicationState() != newState) {
+      // Only recreate the desc if this is not a retry
+      peers.put(peerId,
+        new ReplicationPeerDescription(peerId, desc.isEnabled(), 
desc.getPeerConfig(), newState));
+    }
   }
 
   public void removeAllQueuesAndHFileRefs(String peerId) throws 
ReplicationException {
@@ -301,10 +314,10 @@ public class ReplicationPeerManager {
       // If replicate_all flag is true, it means all user tables will be 
replicated to peer cluster.
       // Then allow config exclude namespaces or exclude table-cfs which can't 
be replicated to peer
       // cluster.
-      if ((peerConfig.getNamespaces() != null && 
!peerConfig.getNamespaces().isEmpty())
-          || (peerConfig.getTableCFsMap() != null && 
!peerConfig.getTableCFsMap().isEmpty())) {
-        throw new DoNotRetryIOException("Need clean namespaces or table-cfs 
config firstly "
-            + "when you want replicate all cluster");
+      if ((peerConfig.getNamespaces() != null && 
!peerConfig.getNamespaces().isEmpty()) ||
+        (peerConfig.getTableCFsMap() != null && 
!peerConfig.getTableCFsMap().isEmpty())) {
+        throw new DoNotRetryIOException("Need clean namespaces or table-cfs 
config firstly " +
+          "when you want replicate all cluster");
       }
       
checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(),
         peerConfig.getExcludeTableCFsMap());
@@ -312,13 +325,13 @@ public class ReplicationPeerManager {
       // If replicate_all flag is false, it means all user tables can't be 
replicated to peer
       // cluster. Then allow to config namespaces or table-cfs which will be 
replicated to peer
       // cluster.
-      if ((peerConfig.getExcludeNamespaces() != null
-          && !peerConfig.getExcludeNamespaces().isEmpty())
-          || (peerConfig.getExcludeTableCFsMap() != null
-              && !peerConfig.getExcludeTableCFsMap().isEmpty())) {
+      if ((peerConfig.getExcludeNamespaces() != null &&
+        !peerConfig.getExcludeNamespaces().isEmpty()) ||
+        (peerConfig.getExcludeTableCFsMap() != null &&
+          !peerConfig.getExcludeTableCFsMap().isEmpty())) {
         throw new DoNotRetryIOException(
-            "Need clean exclude-namespaces or exclude-table-cfs config firstly"
-                + " when replicate_all flag is false");
+          "Need clean exclude-namespaces or exclude-table-cfs config firstly" +
+            " when replicate_all flag is false");
       }
       checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
         peerConfig.getTableCFsMap());
@@ -338,11 +351,11 @@ public class ReplicationPeerManager {
     // TODO: Add namespace, replicat_all flag back
     if (peerConfig.replicateAllUserTables()) {
       throw new DoNotRetryIOException(
-          "Only support replicated table config for sync replication peer");
+        "Only support replicated table config for sync replication peer");
     }
     if (peerConfig.getNamespaces() != null && 
!peerConfig.getNamespaces().isEmpty()) {
       throw new DoNotRetryIOException(
-          "Only support replicated table config for sync replication peer");
+        "Only support replicated table config for sync replication peer");
     }
     if (peerConfig.getTableCFsMap() == null || 
peerConfig.getTableCFsMap().isEmpty()) {
       throw new DoNotRetryIOException("Need config replicated tables for sync 
replication peer");
@@ -350,7 +363,7 @@ public class ReplicationPeerManager {
     for (List<String> cfs : peerConfig.getTableCFsMap().values()) {
       if (cfs != null && !cfs.isEmpty()) {
         throw new DoNotRetryIOException(
-            "Only support replicated table config for sync replication peer");
+          "Only support replicated table config for sync replication peer");
       }
     }
   }
@@ -394,7 +407,7 @@ public class ReplicationPeerManager {
   private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)
       throws DoNotRetryIOException {
     String filterCSV = peerConfig.getConfiguration()
-        .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
+      .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
     if (filterCSV != null && !filterCSV.isEmpty()) {
       String[] filters = filterCSV.split(",");
       for (String filter : filters) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
index aad3b06..8fc932f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
@@ -18,11 +18,12 @@
 package org.apache.hadoop.hbase.master.replication;
 
 import java.io.IOException;
-
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
 import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
@@ -32,26 +33,29 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerSyncReplicationStateTransitionState;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.TransitPeerSyncReplicationStateStateData;
 
 /**
- * The procedure for transit current cluster state for a synchronous 
replication peer.
+ * The procedure for transit current sync replication state for a synchronous 
replication peer.
  */
 @InterfaceAudience.Private
-public class TransitPeerSyncReplicationStateProcedure extends 
ModifyPeerProcedure {
+public class TransitPeerSyncReplicationStateProcedure
+    extends AbstractPeerProcedure<PeerSyncReplicationStateTransitionState> {
 
   private static final Logger LOG =
     LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class);
 
-  private SyncReplicationState state;
+  private SyncReplicationState fromState;
+
+  private SyncReplicationState toState;
 
   public TransitPeerSyncReplicationStateProcedure() {
   }
 
   public TransitPeerSyncReplicationStateProcedure(String peerId, 
SyncReplicationState state) {
     super(peerId);
-    this.state = state;
+    this.toState = state;
   }
 
   @Override
@@ -60,99 +64,154 @@ public class TransitPeerSyncReplicationStateProcedure 
extends ModifyPeerProcedur
   }
 
   @Override
-  protected void prePeerModification(MasterProcedureEnv env)
-      throws IOException, ReplicationException {
-    MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
-    if (cpHost != null) {
-      cpHost.preTransitReplicationPeerSyncReplicationState(peerId, state);
+  protected void serializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
+    super.serializeStateData(serializer);
+    TransitPeerSyncReplicationStateStateData.Builder builder =
+      TransitPeerSyncReplicationStateStateData.newBuilder()
+        .setToState(ReplicationPeerConfigUtil.toSyncReplicationState(toState));
+    if (fromState != null) {
+      
builder.setFromState(ReplicationPeerConfigUtil.toSyncReplicationState(fromState));
     }
-    env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, 
state);
+    serializer.serialize(builder.build());
   }
 
   @Override
-  protected void updatePeerStorage(MasterProcedureEnv env) throws 
ReplicationException {
-    env.getReplicationPeerManager().transitPeerSyncReplicationState(peerId, 
state);
+  protected void deserializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
+    super.deserializeStateData(serializer);
+    TransitPeerSyncReplicationStateStateData data =
+      serializer.deserialize(TransitPeerSyncReplicationStateStateData.class);
+    toState = 
ReplicationPeerConfigUtil.toSyncReplicationState(data.getToState());
+    if (data.hasFromState()) {
+      fromState = 
ReplicationPeerConfigUtil.toSyncReplicationState(data.getFromState());
+    }
   }
 
   @Override
-  protected void postPeerModification(MasterProcedureEnv env)
-    throws IOException, ReplicationException {
-    LOG.info("Successfully transit current cluster state to {} in synchronous 
replication peer {}",
-      state, peerId);
+  protected PeerSyncReplicationStateTransitionState getState(int stateId) {
+    return PeerSyncReplicationStateTransitionState.forNumber(stateId);
+  }
+
+  @Override
+  protected int getStateId(PeerSyncReplicationStateTransitionState state) {
+    return state.getNumber();
+  }
+
+  @Override
+  protected PeerSyncReplicationStateTransitionState getInitialState() {
+    return 
PeerSyncReplicationStateTransitionState.PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION;
+  }
+
+  private void preTransit(MasterProcedureEnv env) throws IOException {
     MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
     if (cpHost != null) {
-      
env.getMasterCoprocessorHost().postTransitReplicationPeerSyncReplicationState(peerId,
 state);
+      cpHost.preTransitReplicationPeerSyncReplicationState(peerId, toState);
     }
+    fromState = 
env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, 
toState);
   }
 
-  @Override
-  protected void serializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
-    super.serializeStateData(serializer);
-    serializer.serialize(TransitPeerSyncReplicationStateStateData.newBuilder()
-        
.setSyncReplicationState(ReplicationPeerConfigUtil.toSyncReplicationState(state)).build());
+  private void postTransit(MasterProcedureEnv env) throws IOException {
+    LOG.info(
+      "Successfully transit current cluster state from {} to {} for sync 
replication peer {}",
+      fromState, toState, peerId);
+    MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+    if (cpHost != null) {
+      
env.getMasterCoprocessorHost().postTransitReplicationPeerSyncReplicationState(peerId,
+        fromState, toState);
+    }
   }
 
-  @Override
-  protected void deserializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
-    super.deserializeStateData(serializer);
-    TransitPeerSyncReplicationStateStateData data =
-        serializer.deserialize(TransitPeerSyncReplicationStateStateData.class);
-    state = 
ReplicationPeerConfigUtil.toSyncReplicationState(data.getSyncReplicationState());
+  private List<RegionInfo> getRegionsToReopen(MasterProcedureEnv env) {
+    return 
env.getReplicationPeerManager().getPeerConfig(peerId).get().getTableCFsMap().keySet()
+      .stream()
+      .flatMap(tn -> 
env.getAssignmentManager().getRegionStates().getRegionsOfTable(tn).stream())
+      .collect(Collectors.toList());
   }
 
   @Override
-  protected Flow executeFromState(MasterProcedureEnv env, 
PeerModificationState state)
+  protected Flow executeFromState(MasterProcedureEnv env,
+      PeerSyncReplicationStateTransitionState state)
       throws ProcedureSuspendedException, ProcedureYieldException, 
InterruptedException {
     switch (state) {
-      case PRE_PEER_MODIFICATION:
+      case PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION:
         try {
-          prePeerModification(env);
+          preTransit(env);
         } catch (IOException e) {
-          LOG.warn("{} failed to call pre CP hook or the pre check is failed 
for peer {}, " +
-            "mark the procedure as failure and give up", getClass().getName(), 
peerId, e);
-          setFailure("master-" + getPeerOperationType().name().toLowerCase() + 
"-peer", e);
-          releaseLatch();
+          LOG.warn("Failed to call pre CP hook or the pre check is failed for 
peer {} " +
+            "when transiting sync replication peer state to {}, " +
+            "mark the procedure as failure and give up", peerId, toState, e);
+          setFailure("master-transit-peer-sync-replication-state", e);
           return Flow.NO_MORE_STATE;
-        } catch (ReplicationException e) {
-          LOG.warn("{} failed to call prePeerModification for peer {}, retry", 
getClass().getName(),
-            peerId, e);
-          throw new ProcedureYieldException();
         }
-        setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
+        
setNextState(PeerSyncReplicationStateTransitionState.SET_PEER_NEW_SYNC_REPLICATION_STATE);
         return Flow.HAS_MORE_STATE;
-      case UPDATE_PEER_STORAGE:
+      case SET_PEER_NEW_SYNC_REPLICATION_STATE:
         try {
-          updatePeerStorage(env);
+          
env.getReplicationPeerManager().setPeerNewSyncReplicationState(peerId, toState);
         } catch (ReplicationException e) {
-          LOG.warn("{} update peer storage for peer {} failed, retry", 
getClass().getName(), peerId,
-            e);
+          LOG.warn("Failed to update peer storage for peer {} when starting 
transiting sync " +
+            "replication peer state from {} to {}, retry", peerId, fromState, 
toState, e);
           throw new ProcedureYieldException();
         }
-        setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
+        setNextState(
+          
PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN);
+        return Flow.HAS_MORE_STATE;
+      case REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN:
+        
addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
+          .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), 
sn, 0))
+          .toArray(RefreshPeerProcedure[]::new));
+        if (fromState == SyncReplicationState.STANDBY &&
+          toState == SyncReplicationState.DOWNGRADE_ACTIVE) {
+          
setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
+        } else {
+          
setNextState(PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
+        }
+        return Flow.HAS_MORE_STATE;
+      case REPLAY_REMOTE_WAL_IN_PEER:
+        // TODO: replay remote wal when transiting from S to DA.
+        
setNextState(PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
         return Flow.HAS_MORE_STATE;
-      case REFRESH_PEER_ON_RS:
-        // TODO: Need add child procedure for every RegionServer
-        setNextState(PeerModificationState.POST_PEER_MODIFICATION);
+      case REOPEN_ALL_REGIONS_IN_PEER:
+        try {
+          addChildProcedure(
+            
env.getAssignmentManager().createReopenProcedures(getRegionsToReopen(env)));
+        } catch (IOException e) {
+          LOG.warn("Failed to schedule region reopen for peer {} when starting 
transiting sync " +
+            "replication peer state from {} to {}, retry", peerId, fromState, 
toState, e);
+          throw new ProcedureYieldException();
+        }
+        setNextState(
+          
PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
         return Flow.HAS_MORE_STATE;
-      case POST_PEER_MODIFICATION:
+      case TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE:
         try {
-          postPeerModification(env);
+          
env.getReplicationPeerManager().transitPeerSyncReplicationState(peerId, 
toState);
         } catch (ReplicationException e) {
-          LOG.warn("{} failed to call postPeerModification for peer {}, retry",
-            getClass().getName(), peerId, e);
+          LOG.warn("Failed to update peer storage for peer {} when ending 
transiting sync " +
+            "replication peer state from {} to {}, retry", peerId, fromState, 
toState, e);
           throw new ProcedureYieldException();
+        }
+        setNextState(
+          
PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END);
+        return Flow.HAS_MORE_STATE;
+      case REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END:
+        
addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
+          .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), 
sn, 1))
+          .toArray(RefreshPeerProcedure[]::new));
+        setNextState(
+          
PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
+      case POST_PEER_SYNC_REPLICATION_STATE_TRANSITION:
+        try {
+          postTransit(env);
         } catch (IOException e) {
-          LOG.warn("{} failed to call post CP hook for peer {}, " +
-            "ignore since the procedure has already done", 
getClass().getName(), peerId, e);
+          LOG.warn(
+            "Failed to call post CP hook for peer {} when transiting sync 
replication " +
+              "peer state from {} to {}, ignore since the procedure has 
already done",
+            peerId, fromState, toState, e);
         }
-        releaseLatch();
         return Flow.NO_MORE_STATE;
       default:
         throw new UnsupportedOperationException("unhandled state=" + state);
     }
   }
 
-  private void releaseLatch() {
-    ProcedurePrepareLatch.releaseLatch(latch, this);
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index a17b402..9c23750 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1801,21 +1801,27 @@ public class HRegionServer extends HasThread implements
    * be hooked up to WAL.
    */
   private void setupWALAndReplication() throws IOException {
+    boolean isMasterNoTableOrSystemTableOnly = this instanceof HMaster &&
+      (!LoadBalancer.isTablesOnMaster(conf) || 
LoadBalancer.isSystemTablesOnlyOnMaster(conf));
+    if (isMasterNoTableOrSystemTableOnly) {
+      conf.setBoolean(HConstants.SYNC_REPLICATION_ENABLED, false);
+    }
     WALFactory factory = new WALFactory(conf, serverName.toString());
+    if (!isMasterNoTableOrSystemTableOnly) {
+      // TODO Replication make assumptions here based on the default 
filesystem impl
+      Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+      String logName = 
AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
 
-    // TODO Replication make assumptions here based on the default filesystem 
impl
-    Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
-    String logName = 
AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
-
-    Path logDir = new Path(walRootDir, logName);
-    LOG.debug("logDir={}", logDir);
-    if (this.walFs.exists(logDir)) {
-      throw new RegionServerRunningException(
-          "Region server has already created directory at " + 
this.serverName.toString());
+      Path logDir = new Path(walRootDir, logName);
+      LOG.debug("logDir={}", logDir);
+      if (this.walFs.exists(logDir)) {
+        throw new RegionServerRunningException(
+            "Region server has already created directory at " + 
this.serverName.toString());
+      }
+      // Instantiate replication if replication enabled. Pass it the log 
directories.
+      createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir,
+        factory.getWALProvider());
     }
-    // Instantiate replication if replication enabled. Pass it the log 
directories.
-    createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir,
-      factory.getWALProvider());
     this.walFactory = factory;
   }
 
@@ -2937,11 +2943,6 @@ public class HRegionServer extends HasThread implements
    */
   private static void createNewReplicationInstance(Configuration conf, 
HRegionServer server,
       FileSystem walFs, Path walDir, Path oldWALDir, WALProvider walProvider) 
throws IOException {
-    if ((server instanceof HMaster) &&
-      (!LoadBalancer.isTablesOnMaster(conf) || 
LoadBalancer.isSystemTablesOnlyOnMaster(conf))) {
-      return;
-    }
-
     // read in the name of the source replication class from the config file.
     String sourceClassname = 
conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
       HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);

http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
index 23ba773..4529943 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
@@ -18,17 +18,22 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import org.apache.hadoop.hbase.replication.regionserver.PeerProcedureHandler;
+import 
org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
- * A source for a replication stream has to expose this service.
- * This service allows an application to hook into the
- * regionserver and watch for new transactions.
+ * A source for a replication stream has to expose this service. This service 
allows an application
+ * to hook into the regionserver and watch for new transactions.
  */
 @InterfaceAudience.Private
 public interface ReplicationSourceService extends ReplicationService {
 
   /**
+   * Returns an info provider for sync replication peer.
+   */
+  SyncReplicationPeerInfoProvider getSyncReplicationPeerInfoProvider();
+
+  /**
    * Returns a Handler to handle peer procedures.
    */
   PeerProcedureHandler getPeerProcedureHandler();

http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java
index 6df2af9..efafd09 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java
@@ -28,8 +28,8 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 public interface PeerActionListener {
 
-  default void peerRemoved(String peerId) {}
+  static final PeerActionListener DUMMY = new PeerActionListener() {};
 
   default void peerSyncReplicationStateChange(String peerId, 
SyncReplicationState from,
-      SyncReplicationState to) {}
+      SyncReplicationState to, int stage) {}
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
index 65da9af..52b604b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
@@ -15,11 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
-
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -29,13 +28,16 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 public interface PeerProcedureHandler {
 
-  public void addPeer(String peerId) throws ReplicationException, IOException;
+  void addPeer(String peerId) throws ReplicationException, IOException;
+
+  void removePeer(String peerId) throws ReplicationException, IOException;
 
-  public void removePeer(String peerId) throws ReplicationException, 
IOException;
+  void disablePeer(String peerId) throws ReplicationException, IOException;
 
-  public void disablePeer(String peerId) throws ReplicationException, 
IOException;
+  void enablePeer(String peerId) throws ReplicationException, IOException;
 
-  public void enablePeer(String peerId) throws ReplicationException, 
IOException;
+  void updatePeerConfig(String peerId) throws ReplicationException, 
IOException;
 
-  public void updatePeerConfig(String peerId) throws ReplicationException, 
IOException;
+  void transitSyncReplicationPeerState(String peerId, int stage, HRegionServer 
rs)
+      throws ReplicationException, IOException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
index 78c1977..7fc9f53 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
@@ -19,23 +19,32 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
 import java.util.concurrent.locks.Lock;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
 import org.apache.hadoop.hbase.util.KeyLocker;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @InterfaceAudience.Private
 public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(PeerProcedureHandlerImpl.class);
+
   private final ReplicationSourceManager replicationSourceManager;
+  private final PeerActionListener peerActionListener;
   private final KeyLocker<String> peersLock = new KeyLocker<>();
 
-  public PeerProcedureHandlerImpl(ReplicationSourceManager 
replicationSourceManager) {
+  public PeerProcedureHandlerImpl(ReplicationSourceManager 
replicationSourceManager,
+      PeerActionListener peerActionListener) {
     this.replicationSourceManager = replicationSourceManager;
+    this.peerActionListener = peerActionListener;
   }
 
   @Override
@@ -61,7 +70,6 @@ public class PeerProcedureHandlerImpl implements 
PeerProcedureHandler {
   }
 
   private void refreshPeerState(String peerId) throws ReplicationException, 
IOException {
-    PeerState newState;
     Lock peerLock = peersLock.acquireLock(peerId);
     ReplicationPeerImpl peer = null;
     PeerState oldState = null;
@@ -72,7 +80,7 @@ public class PeerProcedureHandlerImpl implements 
PeerProcedureHandler {
         throw new ReplicationException("Peer with id=" + peerId + " is not 
cached.");
       }
       oldState = peer.getPeerState();
-      newState = 
replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
+      PeerState newState = 
replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
       // RS need to start work with the new replication state change
       if (oldState.equals(PeerState.ENABLED) && 
newState.equals(PeerState.DISABLED)) {
         replicationSourceManager.refreshSources(peerId);
@@ -132,4 +140,42 @@ public class PeerProcedureHandlerImpl implements 
PeerProcedureHandler {
       peerLock.unlock();
     }
   }
+
+  @Override
+  public void transitSyncReplicationPeerState(String peerId, int stage, 
HRegionServer rs)
+      throws ReplicationException, IOException {
+    ReplicationPeers replicationPeers = 
replicationSourceManager.getReplicationPeers();
+    Lock peerLock = peersLock.acquireLock(peerId);
+    try {
+      ReplicationPeerImpl peer = replicationPeers.getPeer(peerId);
+      if (peer == null) {
+        throw new ReplicationException("Peer with id=" + peerId + " is not 
cached.");
+      }
+      if (!peer.getPeerConfig().isSyncReplication()) {
+        throw new ReplicationException("Peer with id=" + peerId + " is not 
synchronous.");
+      }
+      SyncReplicationState newState = peer.getNewSyncReplicationState();
+      if (stage == 0) {
+        if (newState != SyncReplicationState.NONE) {
+          LOG.warn("The new sync replication state for peer {} has already 
been set to {}, " +
+            "this should be a retry, give up", peerId, newState);
+          return;
+        }
+        newState = replicationPeers.refreshPeerNewSyncReplicationState(peerId);
+        SyncReplicationState oldState = peer.getSyncReplicationState();
+        peerActionListener.peerSyncReplicationStateChange(peerId, oldState, 
newState, stage);
+      } else {
+        if (newState == SyncReplicationState.NONE) {
+          LOG.warn("The new sync replication state for peer {} has already 
been clear, and the " +
+            "current state is {}, this should be a retry, give up", peerId, 
newState);
+          return;
+        }
+        SyncReplicationState oldState = peer.getSyncReplicationState();
+        peerActionListener.peerSyncReplicationStateChange(peerId, oldState, 
newState, stage);
+        peer.transitSyncReplicationState();
+      }
+    } finally {
+      peerLock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java
index 7ada24b..8fe16bc 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java
@@ -35,12 +35,15 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.R
 public class RefreshPeerCallable implements RSProcedureCallable {
 
   private static final Logger LOG = 
Logger.getLogger(RefreshPeerCallable.class);
+
   private HRegionServer rs;
 
   private String peerId;
 
   private PeerModificationType type;
 
+  private int stage;
+
   private Exception initError;
 
   @Override
@@ -67,6 +70,9 @@ public class RefreshPeerCallable implements 
RSProcedureCallable {
       case UPDATE_PEER_CONFIG:
         handler.updatePeerConfig(this.peerId);
         break;
+      case TRANSIT_SYNC_REPLICATION_STATE:
+        handler.transitSyncReplicationPeerState(peerId, stage, rs);
+        break;
       default:
         throw new IllegalArgumentException("Unknown peer modification type: " 
+ type);
     }
@@ -80,6 +86,7 @@ public class RefreshPeerCallable implements 
RSProcedureCallable {
       RefreshPeerParameter param = RefreshPeerParameter.parseFrom(parameter);
       this.peerId = param.getPeerId();
       this.type = param.getType();
+      this.stage = param.getStage();
     } catch (InvalidProtocolBufferException e) {
       initError = e;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 8290ac3..2846d2c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -41,6 +41,7 @@ import 
org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
 import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -66,6 +67,7 @@ public class Replication implements ReplicationSourceService, 
ReplicationSinkSer
   private ReplicationTracker replicationTracker;
   private Configuration conf;
   private ReplicationSink replicationSink;
+  private SyncReplicationPeerInfoProvider syncReplicationPeerInfoProvider;
   // Hosting server
   private Server server;
   /** Statistics thread schedule pool */
@@ -120,19 +122,30 @@ public class Replication implements 
ReplicationSourceService, ReplicationSinkSer
     } catch (KeeperException ke) {
       throw new IOException("Could not read cluster id", ke);
     }
+    SyncReplicationPeerMappingManager mapping = new 
SyncReplicationPeerMappingManager();
     this.replicationManager = new ReplicationSourceManager(queueStorage, 
replicationPeers,
         replicationTracker, conf, this.server, fs, logDir, oldLogDir, 
clusterId,
-        walProvider != null ? walProvider.getWALFileLengthProvider() : p -> 
OptionalLong.empty());
+        walProvider != null ? walProvider.getWALFileLengthProvider() : p -> 
OptionalLong.empty(),
+        mapping);
+    this.syncReplicationPeerInfoProvider =
+        new SyncReplicationPeerInfoProviderImpl(replicationPeers, mapping);
+    PeerActionListener peerActionListener = PeerActionListener.DUMMY;
     if (walProvider != null) {
       walProvider
         .addWALActionsListener(new ReplicationSourceWALActionListener(conf, 
replicationManager));
+      if (walProvider instanceof SyncReplicationWALProvider) {
+        SyncReplicationWALProvider syncWALProvider = 
(SyncReplicationWALProvider) walProvider;
+        peerActionListener = syncWALProvider;
+        syncWALProvider.setPeerInfoProvider(syncReplicationPeerInfoProvider);
+      }
     }
     this.statsThreadPeriod =
         this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
     LOG.debug("Replication stats-in-log period={} seconds",  
this.statsThreadPeriod);
     this.replicationLoad = new ReplicationLoad();
 
-    this.peerProcedureHandler = new 
PeerProcedureHandlerImpl(replicationManager);
+    this.peerProcedureHandler =
+      new PeerProcedureHandlerImpl(replicationManager, peerActionListener);
   }
 
   @Override
@@ -270,4 +283,9 @@ public class Replication implements 
ReplicationSourceService, ReplicationSinkSer
     MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
     this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics);
   }
+
+  @Override
+  public SyncReplicationPeerInfoProvider getSyncReplicationPeerInfoProvider() {
+    return syncReplicationPeerInfoProvider;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58b5849f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 9b4a22c..4212597 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -51,6 +51,7 @@ import 
org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationListener;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
@@ -135,6 +136,8 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   // For recovered source, the queue id's format is peer_id-servername-*
   private final ConcurrentMap<String, Map<String, NavigableSet<String>>> 
walsByIdRecoveredQueues;
 
+  private final SyncReplicationPeerMappingManager 
syncReplicationPeerMappingManager;
+
   private final Configuration conf;
   private final FileSystem fs;
   // The paths to the latest log of each wal group, for new coming peers
@@ -169,9 +172,8 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
       ReplicationPeers replicationPeers, ReplicationTracker 
replicationTracker, Configuration conf,
       Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID 
clusterId,
-      WALFileLengthProvider walFileLengthProvider) throws IOException {
-    // CopyOnWriteArrayList is thread-safe.
-    // Generally, reading is more than modifying.
+      WALFileLengthProvider walFileLengthProvider,
+      SyncReplicationPeerMappingManager syncReplicationPeerMappingManager) 
throws IOException {
     this.sources = new ConcurrentHashMap<>();
     this.queueStorage = queueStorage;
     this.replicationPeers = replicationPeers;
@@ -184,10 +186,11 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     this.fs = fs;
     this.logDir = logDir;
     this.oldLogDir = oldLogDir;
-    this.sleepBeforeFailover = 
conf.getLong("replication.sleep.before.failover", 30000); // 30
-                                                                               
          // seconds
+    // 30 seconds
+    this.sleepBeforeFailover = 
conf.getLong("replication.sleep.before.failover", 30000);
     this.clusterId = clusterId;
     this.walFileLengthProvider = walFileLengthProvider;
+    this.syncReplicationPeerMappingManager = syncReplicationPeerMappingManager;
     this.replicationTracker.registerListener(this);
     // It's preferable to failover 1 RS at a time, but with good zk servers
     // more could be processed at the same time.
@@ -248,8 +251,11 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   }
 
   /**
-   * 1. Add peer to replicationPeers 2. Add the normal source and related 
replication queue 3. Add
-   * HFile Refs
+   * <ol>
+   * <li>Add peer to replicationPeers</li>
+   * <li>Add the normal source and related replication queue</li>
+   * <li>Add HFile Refs</li>
+   * </ol>
    * @param peerId the id of replication peer
    */
   public void addPeer(String peerId) throws IOException {
@@ -268,13 +274,16 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   }
 
   /**
-   * 1. Remove peer for replicationPeers 2. Remove all the recovered sources 
for the specified id
-   * and related replication queues 3. Remove the normal source and related 
replication queue 4.
-   * Remove HFile Refs
+   * <ol>
+   * <li>Remove peer for replicationPeers</li>
+   * <li>Remove all the recovered sources for the specified id and related 
replication queues</li>
+   * <li>Remove the normal source and related replication queue</li>
+   * <li>Remove HFile Refs</li>
+   * </ol>
    * @param peerId the id of the replication peer
    */
   public void removePeer(String peerId) {
-    replicationPeers.removePeer(peerId);
+    ReplicationPeer peer = replicationPeers.removePeer(peerId);
     String terminateMessage = "Replication stream was removed by a user";
     List<ReplicationSourceInterface> oldSourcesToDelete = new ArrayList<>();
     // synchronized on oldsources to avoid adding recovered source for the 
to-be-removed peer
@@ -305,7 +314,10 @@ public class ReplicationSourceManager implements 
ReplicationListener {
       deleteQueue(peerId);
       this.walsById.remove(peerId);
     }
-
+    ReplicationPeerConfig peerConfig = peer.getPeerConfig();
+    if (peerConfig.isSyncReplication()) {
+      syncReplicationPeerMappingManager.remove(peerId, peerConfig);
+    }
     // Remove HFile Refs
     abortWhenFail(() -> this.queueStorage.removePeerFromHFileRefs(peerId));
   }
@@ -357,6 +369,10 @@ public class ReplicationSourceManager implements 
ReplicationListener {
         }
       }
     }
+    ReplicationPeerConfig peerConfig = peer.getPeerConfig();
+    if (peerConfig.isSyncReplication()) {
+      syncReplicationPeerMappingManager.add(peer.getId(), peerConfig);
+    }
     src.startup();
     return src;
   }
@@ -434,6 +450,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     // Delete queue from storage and memory
     deleteQueue(src.getQueueId());
     this.walsById.remove(src.getQueueId());
+
   }
 
   /**

Reply via email to