HBASE-20569 NPE in RecoverStandbyProcedure.execute

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/44ca13fe
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/44ca13fe
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/44ca13fe

Branch: refs/heads/master
Commit: 44ca13fe07dc5050a2bc98ccd3f65953f06aaef8
Parents: 7448b04
Author: Guanghao Zhang <zg...@apache.org>
Authored: Thu May 31 20:54:59 2018 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Thu Jun 28 18:08:43 2018 +0800

----------------------------------------------------------------------
 .../src/main/protobuf/MasterProcedure.proto     |  26 ++-
 .../org/apache/hadoop/hbase/master/HMaster.java |  10 +-
 .../hadoop/hbase/master/MasterServices.java     |   6 +-
 .../procedure/MasterProcedureScheduler.java     |   3 +-
 .../procedure/PeerProcedureInterface.java       |   2 +-
 .../hbase/master/procedure/PeerQueue.java       |   3 +-
 .../replication/RecoverStandbyProcedure.java    |  68 ++++--
 .../master/replication/RemovePeerProcedure.java |   5 +-
 .../ReplaySyncReplicationWALManager.java        | 169 --------------
 .../ReplaySyncReplicationWALProcedure.java      | 196 -----------------
 .../SyncReplicationReplayWALManager.java        | 218 +++++++++++++++++++
 .../SyncReplicationReplayWALProcedure.java      | 164 ++++++++++++++
 ...SyncReplicationReplayWALRemoteProcedure.java | 213 ++++++++++++++++++
 ...ransitPeerSyncReplicationStateProcedure.java |   6 +-
 ...ZKSyncReplicationReplayWALWorkerStorage.java | 108 +++++++++
 .../ReplaySyncReplicationWALCallable.java       |  46 ++--
 .../hbase/master/MockNoopMasterServices.java    |   4 +-
 .../replication/SyncReplicationTestBase.java    |   6 +-
 .../TestSyncReplicationStandbyKillMaster.java   |  88 ++++++++
 .../TestSyncReplicationStandbyKillRS.java       | 119 ++++++++++
 .../master/TestRecoverStandbyProcedure.java     |  10 +-
 21 files changed, 1040 insertions(+), 430 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto 
b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
index 23ec8f8..a062e9a 100644
--- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
@@ -486,22 +486,34 @@ message TransitPeerSyncReplicationStateStateData {
 
 enum RecoverStandbyState {
   RENAME_SYNC_REPLICATION_WALS_DIR = 1;
-  INIT_WORKERS = 2;
-  DISPATCH_TASKS = 3;
-  SNAPSHOT_SYNC_REPLICATION_WALS_DIR = 4;
+  REGISTER_PEER_TO_WORKER_STORAGE = 2;
+  DISPATCH_WALS = 3;
+  UNREGISTER_PEER_FROM_WORKER_STORAGE = 4;
+  SNAPSHOT_SYNC_REPLICATION_WALS_DIR = 5;
+}
+
+enum SyncReplicationReplayWALState {
+  ASSIGN_WORKER = 1;
+  DISPATCH_WALS_TO_WORKER = 2;
+  RELEASE_WORKER = 3;
 }
 
 message RecoverStandbyStateData {
+  required bool serial  = 1;
+}
+
+message SyncReplicationReplayWALStateData {
   required string peer_id = 1;
+  repeated string wal = 2;
 }
 
-message ReplaySyncReplicationWALStateData {
+message SyncReplicationReplayWALRemoteStateData {
   required string peer_id = 1;
-  required string wal = 2;
-  optional ServerName target_server = 3;
+  repeated string wal = 2;
+  required ServerName target_server = 3;
 }
 
 message ReplaySyncReplicationWALParameter {
   required string peer_id = 1;
-  required string wal = 2;
+  repeated string wal = 2;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 2c23e85..dc62752 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -138,8 +138,8 @@ import 
org.apache.hadoop.hbase.master.replication.AddPeerProcedure;
 import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure;
 import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure;
 import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
-import 
org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager;
 import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
+import 
org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager;
 import 
org.apache.hadoop.hbase.master.replication.TransitPeerSyncReplicationStateProcedure;
 import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
@@ -343,7 +343,7 @@ public class HMaster extends HRegionServer implements 
MasterServices {
   // manager of replication
   private ReplicationPeerManager replicationPeerManager;
 
-  private ReplaySyncReplicationWALManager replaySyncReplicationWALManager;
+  private SyncReplicationReplayWALManager syncReplicationReplayWALManager;
 
   // buffer for "fatal error" notices from region servers
   // in the cluster. This is only used for assisting
@@ -754,6 +754,7 @@ public class HMaster extends HRegionServer implements 
MasterServices {
     this.splitOrMergeTracker.start();
 
     this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, 
conf);
+    this.syncReplicationReplayWALManager = new 
SyncReplicationReplayWALManager(this);
 
     this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, 
this.serverManager);
     this.drainingServerTracker.start();
@@ -852,7 +853,6 @@ public class HMaster extends HRegionServer implements 
MasterServices {
     initializeMemStoreChunkCreator();
     this.fileSystemManager = new MasterFileSystem(conf);
     this.walManager = new MasterWalManager(this);
-    this.replaySyncReplicationWALManager = new 
ReplaySyncReplicationWALManager(this);
 
     // enable table descriptors cache
     this.tableDescriptors.setCacheOn();
@@ -3764,7 +3764,7 @@ public class HMaster extends HRegionServer implements 
MasterServices {
   }
 
   @Override
-  public ReplaySyncReplicationWALManager getReplaySyncReplicationWALManager() {
-    return this.replaySyncReplicationWALManager;
+  public SyncReplicationReplayWALManager getSyncReplicationReplayWALManager() {
+    return this.syncReplicationReplayWALManager;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index 6034ff7..7b0c56a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -38,8 +38,8 @@ import 
org.apache.hadoop.hbase.master.assignment.AssignmentManager;
 import org.apache.hadoop.hbase.master.locking.LockManager;
 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import 
org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager;
 import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
+import 
org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager;
 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
 import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
 import org.apache.hadoop.hbase.procedure2.LockedResource;
@@ -462,9 +462,9 @@ public interface MasterServices extends Server {
   ReplicationPeerManager getReplicationPeerManager();
 
   /**
-   * Returns the {@link ReplaySyncReplicationWALManager}.
+   * Returns the {@link SyncReplicationReplayWALManager}.
    */
-  ReplaySyncReplicationWALManager getReplaySyncReplicationWALManager();
+  SyncReplicationReplayWALManager getSyncReplicationReplayWALManager();
 
   /**
    * Update the peerConfig for the specified peer

http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
index 1420986..8a28b84 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
@@ -207,7 +207,8 @@ public class MasterProcedureScheduler extends 
AbstractProcedureScheduler {
       // check if the next procedure is still a child.
       // if not, remove the rq from the fairq and go back to the xlock state
       Procedure<?> nextProc = rq.peek();
-      if (nextProc != null && !Procedure.haveSameParent(nextProc, pollResult)) 
{
+      if (nextProc != null && !Procedure.haveSameParent(nextProc, pollResult)
+          && nextProc.getRootProcId() != pollResult.getRootProcId()) {
         removeFromRunQueue(fairq, rq);
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java
index 76b5163..0195ab9 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java
@@ -24,7 +24,7 @@ public interface PeerProcedureInterface {
 
   enum PeerOperationType {
     ADD, REMOVE, ENABLE, DISABLE, UPDATE_CONFIG, REFRESH, 
TRANSIT_SYNC_REPLICATION_STATE,
-    RECOVER_STANDBY, REPLAY_SYNC_REPLICATION_WAL
+    RECOVER_STANDBY, SYNC_REPLICATION_REPLAY_WAL, 
SYNC_REPLICATION_REPLAY_WAL_REMOTE
   }
 
   String getPeerId();

http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java
index 25feb7e..86d8e43 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java
@@ -50,6 +50,7 @@ class PeerQueue extends Queue<String> {
 
   private static boolean requirePeerExclusiveLock(PeerProcedureInterface proc) 
{
     return proc.getPeerOperationType() != PeerOperationType.REFRESH
-        && proc.getPeerOperationType() != 
PeerOperationType.REPLAY_SYNC_REPLICATION_WAL;
+        && proc.getPeerOperationType() != 
PeerOperationType.SYNC_REPLICATION_REPLAY_WAL
+        && proc.getPeerOperationType() != 
PeerOperationType.SYNC_REPLICATION_REPLAY_WAL_REMOTE;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java
index 9860774..5494742 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java
@@ -18,60 +18,79 @@
 package org.apache.hadoop.hbase.master.replication;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
 import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RecoverStandbyState;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RecoverStandbyStateData;
 
 @InterfaceAudience.Private
 public class RecoverStandbyProcedure extends 
AbstractPeerProcedure<RecoverStandbyState> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(RecoverStandbyProcedure.class);
 
+  private boolean serial;
+
   public RecoverStandbyProcedure() {
   }
 
-  public RecoverStandbyProcedure(String peerId) {
+  public RecoverStandbyProcedure(String peerId, boolean serial) {
     super(peerId);
+    this.serial = serial;
   }
 
   @Override
   protected Flow executeFromState(MasterProcedureEnv env, RecoverStandbyState 
state)
       throws ProcedureSuspendedException, ProcedureYieldException, 
InterruptedException {
-    ReplaySyncReplicationWALManager replaySyncReplicationWALManager =
-        env.getMasterServices().getReplaySyncReplicationWALManager();
+    SyncReplicationReplayWALManager syncReplicationReplayWALManager =
+        env.getMasterServices().getSyncReplicationReplayWALManager();
     switch (state) {
       case RENAME_SYNC_REPLICATION_WALS_DIR:
         try {
-          replaySyncReplicationWALManager.renameToPeerReplayWALDir(peerId);
+          syncReplicationReplayWALManager.renameToPeerReplayWALDir(peerId);
         } catch (IOException e) {
           LOG.warn("Failed to rename remote wal dir for peer id={}", peerId, 
e);
           setFailure("master-recover-standby", e);
           return Flow.NO_MORE_STATE;
         }
-        setNextState(RecoverStandbyState.INIT_WORKERS);
+        setNextState(RecoverStandbyState.REGISTER_PEER_TO_WORKER_STORAGE);
         return Flow.HAS_MORE_STATE;
-      case INIT_WORKERS:
-        replaySyncReplicationWALManager.initPeerWorkers(peerId);
-        setNextState(RecoverStandbyState.DISPATCH_TASKS);
+      case REGISTER_PEER_TO_WORKER_STORAGE:
+        try {
+          syncReplicationReplayWALManager.registerPeer(peerId);
+        } catch (ReplicationException e) {
+          LOG.warn("Failed to register peer to worker storage for peer id={}, 
retry", peerId, e);
+          throw new ProcedureYieldException();
+        }
+        setNextState(RecoverStandbyState.DISPATCH_WALS);
         return Flow.HAS_MORE_STATE;
-      case DISPATCH_TASKS:
-        
addChildProcedure(getReplayWALs(replaySyncReplicationWALManager).stream()
-            .map(wal -> new ReplaySyncReplicationWALProcedure(peerId,
-                replaySyncReplicationWALManager.removeWALRootPath(wal)))
-            .toArray(ReplaySyncReplicationWALProcedure[]::new));
+      case DISPATCH_WALS:
+        dispathWals(syncReplicationReplayWALManager);
+        setNextState(RecoverStandbyState.UNREGISTER_PEER_FROM_WORKER_STORAGE);
+        return Flow.HAS_MORE_STATE;
+      case UNREGISTER_PEER_FROM_WORKER_STORAGE:
+        try {
+          syncReplicationReplayWALManager.unregisterPeer(peerId);
+        } catch (ReplicationException e) {
+          LOG.warn("Failed to unregister peer from worker storage for peer 
id={}, retry", peerId,
+              e);
+          throw new ProcedureYieldException();
+        }
         setNextState(RecoverStandbyState.SNAPSHOT_SYNC_REPLICATION_WALS_DIR);
         return Flow.HAS_MORE_STATE;
       case SNAPSHOT_SYNC_REPLICATION_WALS_DIR:
         try {
-          replaySyncReplicationWALManager.renameToPeerSnapshotWALDir(peerId);
+          syncReplicationReplayWALManager.renameToPeerSnapshotWALDir(peerId);
         } catch (IOException e) {
           LOG.warn("Failed to cleanup replay wals dir for peer id={}, , 
retry", peerId, e);
           throw new ProcedureYieldException();
@@ -82,10 +101,14 @@ public class RecoverStandbyProcedure extends 
AbstractPeerProcedure<RecoverStandb
     }
   }
 
-  private List<Path> getReplayWALs(ReplaySyncReplicationWALManager 
replaySyncReplicationWALManager)
+  // TODO: dispatch wals by region server when serial is true and sort wals
+  private void dispathWals(SyncReplicationReplayWALManager 
syncReplicationReplayWALManager)
       throws ProcedureYieldException {
     try {
-      return 
replaySyncReplicationWALManager.getReplayWALsAndCleanUpUnusedFiles(peerId);
+      List<Path> wals = 
syncReplicationReplayWALManager.getReplayWALsAndCleanUpUnusedFiles(peerId);
+      addChildProcedure(wals.stream().map(wal -> new 
SyncReplicationReplayWALProcedure(peerId,
+          
Arrays.asList(syncReplicationReplayWALManager.removeWALRootPath(wal))))
+          .toArray(SyncReplicationReplayWALProcedure[]::new));
     } catch (IOException e) {
       LOG.warn("Failed to get replay wals for peer id={}, , retry", peerId, e);
       throw new ProcedureYieldException();
@@ -111,4 +134,17 @@ public class RecoverStandbyProcedure extends 
AbstractPeerProcedure<RecoverStandb
   public PeerOperationType getPeerOperationType() {
     return PeerOperationType.RECOVER_STANDBY;
   }
+
+  @Override
+  protected void serializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
+    super.serializeStateData(serializer);
+    
serializer.serialize(RecoverStandbyStateData.newBuilder().setSerial(serial).build());
+  }
+
+  @Override
+  protected void deserializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
+    super.deserializeStateData(serializer);
+    RecoverStandbyStateData data = 
serializer.deserialize(RecoverStandbyStateData.class);
+    serial = data.getSerial();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
index 254448a..4b77c8d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
@@ -67,11 +67,10 @@ public class RemovePeerProcedure extends 
ModifyPeerProcedure {
   }
 
   private void removeRemoteWALs(MasterProcedureEnv env) throws IOException {
-    
env.getMasterServices().getReplaySyncReplicationWALManager().removePeerRemoteWALs(peerId);
+    
env.getMasterServices().getSyncReplicationReplayWALManager().removePeerRemoteWALs(peerId);
   }
 
-  @Override
-  protected void postPeerModification(MasterProcedureEnv env)
+  @Override  protected void postPeerModification(MasterProcedureEnv env)
       throws IOException, ReplicationException {
     if (peerConfig.isSyncReplication()) {
       removeRemoteWALs(env);

http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java
deleted file mode 100644
index 348c134..0000000
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master.replication;
-
-import static 
org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerRemoteWALDir;
-import static 
org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerReplayWALDir;
-import static 
org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerSnapshotWALDir;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.replication.ReplicationUtils;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
-@InterfaceAudience.Private
-public class ReplaySyncReplicationWALManager {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ReplaySyncReplicationWALManager.class);
-
-  private final MasterServices services;
-
-  private final FileSystem fs;
-
-  private final Path walRootDir;
-
-  private final Path remoteWALDir;
-
-  private final Map<String, BlockingQueue<ServerName>> availServers = new 
HashMap<>();
-
-  public ReplaySyncReplicationWALManager(MasterServices services) {
-    this.services = services;
-    this.fs = services.getMasterFileSystem().getWALFileSystem();
-    this.walRootDir = services.getMasterFileSystem().getWALRootDir();
-    this.remoteWALDir = new Path(this.walRootDir, 
ReplicationUtils.REMOTE_WAL_DIR_NAME);
-  }
-
-  public void createPeerRemoteWALDir(String peerId) throws IOException {
-    Path peerRemoteWALDir = getPeerRemoteWALDir(remoteWALDir, peerId);
-    if (!fs.exists(peerRemoteWALDir) && !fs.mkdirs(peerRemoteWALDir)) {
-      throw new IOException("Unable to mkdir " + peerRemoteWALDir);
-    }
-  }
-
-  private void rename(Path src, Path dst, String peerId) throws IOException {
-    if (fs.exists(src)) {
-      deleteDir(dst, peerId);
-      if (!fs.rename(src, dst)) {
-        throw new IOException(
-          "Failed to rename dir from " + src + " to " + dst + " for peer id=" 
+ peerId);
-      }
-      LOG.info("Renamed dir from {} to {} for peer id={}", src, dst, peerId);
-    } else if (!fs.exists(dst)) {
-      throw new IOException(
-        "Want to rename from " + src + " to " + dst + ", but they both do not 
exist");
-    }
-  }
-
-  public void renameToPeerReplayWALDir(String peerId) throws IOException {
-    rename(getPeerRemoteWALDir(remoteWALDir, peerId), 
getPeerReplayWALDir(remoteWALDir, peerId),
-      peerId);
-  }
-
-  public void renameToPeerSnapshotWALDir(String peerId) throws IOException {
-    rename(getPeerReplayWALDir(remoteWALDir, peerId), 
getPeerSnapshotWALDir(remoteWALDir, peerId),
-      peerId);
-  }
-
-  public List<Path> getReplayWALsAndCleanUpUnusedFiles(String peerId) throws 
IOException {
-    Path peerReplayWALDir = getPeerReplayWALDir(remoteWALDir, peerId);
-    for (FileStatus status : fs.listStatus(peerReplayWALDir,
-      p -> p.getName().endsWith(ReplicationUtils.RENAME_WAL_SUFFIX))) {
-      Path src = status.getPath();
-      String srcName = src.getName();
-      String dstName =
-        srcName.substring(0, srcName.length() - 
ReplicationUtils.RENAME_WAL_SUFFIX.length());
-      FSUtils.renameFile(fs, src, new Path(src.getParent(), dstName));
-    }
-    List<Path> wals = new ArrayList<>();
-    for (FileStatus status : fs.listStatus(peerReplayWALDir)) {
-      Path path = status.getPath();
-      if (path.getName().endsWith(ReplicationUtils.SYNC_WAL_SUFFIX)) {
-        wals.add(path);
-      } else {
-        if (!fs.delete(path, true)) {
-          LOG.warn("Can not delete unused file: " + path);
-        }
-      }
-    }
-    return wals;
-  }
-
-  public void snapshotPeerReplayWALDir(String peerId) throws IOException {
-    Path peerReplayWALDir = getPeerReplayWALDir(remoteWALDir, peerId);
-    if (fs.exists(peerReplayWALDir) && !fs.delete(peerReplayWALDir, true)) {
-      throw new IOException(
-          "Failed to remove replay wals dir " + peerReplayWALDir + " for peer 
id=" + peerId);
-    }
-  }
-
-  private void deleteDir(Path dir, String peerId) throws IOException {
-    if (!fs.delete(dir, true) && fs.exists(dir)) {
-      throw new IOException("Failed to remove dir " + dir + " for peer id=" + 
peerId);
-    }
-  }
-
-  public void removePeerRemoteWALs(String peerId) throws IOException {
-    deleteDir(getPeerRemoteWALDir(remoteWALDir, peerId), peerId);
-    deleteDir(getPeerReplayWALDir(remoteWALDir, peerId), peerId);
-    deleteDir(getPeerSnapshotWALDir(remoteWALDir, peerId), peerId);
-  }
-
-  public void initPeerWorkers(String peerId) {
-    BlockingQueue<ServerName> servers = new LinkedBlockingQueue<>();
-    services.getServerManager().getOnlineServers().keySet()
-        .forEach(server -> servers.offer(server));
-    availServers.put(peerId, servers);
-  }
-
-  public ServerName getAvailServer(String peerId, long timeout, TimeUnit unit)
-      throws InterruptedException {
-    return availServers.get(peerId).poll(timeout, unit);
-  }
-
-  public void addAvailServer(String peerId, ServerName server) {
-    availServers.get(peerId).offer(server);
-  }
-
-  public String removeWALRootPath(Path path) {
-    String pathStr = path.toString();
-    // remove the "/" too.
-    return pathStr.substring(walRootDir.toString().length() + 1);
-  }
-
-  @VisibleForTesting
-  public Path getRemoteWALDir() {
-    return remoteWALDir;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java
deleted file mode 100644
index 77fd24d..0000000
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master.replication;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
-import 
org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation;
-import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
-import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
-import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
-import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
-import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
-import 
org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
-import 
org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
-import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
-import 
org.apache.hadoop.hbase.replication.regionserver.ReplaySyncReplicationWALCallable;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALParameter;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALStateData;
-
-@InterfaceAudience.Private
-public class ReplaySyncReplicationWALProcedure extends 
Procedure<MasterProcedureEnv>
-    implements RemoteProcedure<MasterProcedureEnv, ServerName>, 
PeerProcedureInterface {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ReplaySyncReplicationWALProcedure.class);
-
-  private static final long DEFAULT_WAIT_AVAILABLE_SERVER_TIMEOUT = 10000;
-
-  private String peerId;
-
-  private ServerName targetServer = null;
-
-  private String wal;
-
-  private boolean dispatched;
-
-  private ProcedureEvent<?> event;
-
-  private boolean succ;
-
-  public ReplaySyncReplicationWALProcedure() {
-  }
-
-  public ReplaySyncReplicationWALProcedure(String peerId, String wal) {
-    this.peerId = peerId;
-    this.wal = wal;
-  }
-
-  @Override
-  public RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName 
remote) {
-    return new ServerOperation(this, getProcId(), 
ReplaySyncReplicationWALCallable.class,
-        
ReplaySyncReplicationWALParameter.newBuilder().setPeerId(peerId).setWal(wal).build()
-            .toByteArray());
-  }
-
-  @Override
-  public void remoteCallFailed(MasterProcedureEnv env, ServerName remote, 
IOException exception) {
-    complete(env, exception);
-  }
-
-  @Override
-  public void remoteOperationCompleted(MasterProcedureEnv env) {
-    complete(env, null);
-  }
-
-  @Override
-  public void remoteOperationFailed(MasterProcedureEnv env, 
RemoteProcedureException error) {
-    complete(env, error);
-  }
-
-  private void complete(MasterProcedureEnv env, Throwable error) {
-    if (event == null) {
-      LOG.warn("procedure event for {} is null, maybe the procedure is created 
when recovery",
-        getProcId());
-      return;
-    }
-    ReplaySyncReplicationWALManager replaySyncReplicationWALManager =
-        env.getMasterServices().getReplaySyncReplicationWALManager();
-    if (error != null) {
-      LOG.warn("Replay sync replication wal {} on {} failed for peer id={}", 
wal, targetServer,
-        peerId, error);
-      this.succ = false;
-    } else {
-      LOG.warn("Replay sync replication wal {} on {} suceeded for peer id={}", 
wal, targetServer,
-        peerId);
-      this.succ = true;
-      replaySyncReplicationWALManager.addAvailServer(peerId, targetServer);
-    }
-    event.wake(env.getProcedureScheduler());
-    event = null;
-  }
-
-  @Override
-  protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
-      throws ProcedureYieldException, ProcedureSuspendedException, 
InterruptedException {
-    if (dispatched) {
-      if (succ) {
-        return null;
-      }
-      // retry
-      dispatched = false;
-    }
-
-    // Try poll a available server
-    if (targetServer == null) {
-      targetServer = 
env.getMasterServices().getReplaySyncReplicationWALManager()
-          .getAvailServer(peerId, DEFAULT_WAIT_AVAILABLE_SERVER_TIMEOUT, 
TimeUnit.MILLISECONDS);
-      if (targetServer == null) {
-        LOG.info("No available server to replay wal {} for peer id={}, retry", 
wal, peerId);
-        throw new ProcedureYieldException();
-      }
-    }
-
-    // Dispatch task to target server
-    try {
-      env.getRemoteDispatcher().addOperationToNode(targetServer, this);
-    } catch (FailedRemoteDispatchException e) {
-      LOG.info(
-        "Can not add remote operation for replay wal {} on {} for peer id={}, 
" +
-          "this usually because the server is already dead, " + "retry",
-        wal, targetServer, peerId, e);
-      targetServer = null;
-      throw new ProcedureYieldException();
-    }
-    dispatched = true;
-    event = new ProcedureEvent<>(this);
-    event.suspendIfNotReady(this);
-    throw new ProcedureSuspendedException();
-  }
-
-  @Override
-  protected void rollback(MasterProcedureEnv env) throws IOException, 
InterruptedException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  protected boolean abort(MasterProcedureEnv env) {
-    return false;
-  }
-
-  @Override
-  protected void serializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
-    ReplaySyncReplicationWALStateData.Builder builder =
-        
ReplaySyncReplicationWALStateData.newBuilder().setPeerId(peerId).setWal(wal);
-    if (targetServer != null) {
-      builder.setTargetServer(ProtobufUtil.toServerName(targetServer));
-    }
-    serializer.serialize(builder.build());
-  }
-
-  @Override
-  protected void deserializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
-    ReplaySyncReplicationWALStateData data =
-        serializer.deserialize(ReplaySyncReplicationWALStateData.class);
-    peerId = data.getPeerId();
-    wal = data.getWal();
-    if (data.hasTargetServer()) {
-      targetServer = ProtobufUtil.toServerName(data.getTargetServer());
-    }
-  }
-
-  @Override
-  public String getPeerId() {
-    return peerId;
-  }
-
-  @Override
-  public PeerOperationType getPeerOperationType() {
-    return PeerOperationType.REPLAY_SYNC_REPLICATION_WAL;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALManager.java
new file mode 100644
index 0000000..377c9f1
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALManager.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import static 
org.apache.hadoop.hbase.replication.ReplicationUtils.REMOTE_WAL_REPLAY_SUFFIX;
+import static 
org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerRemoteWALDir;
+import static 
org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerReplayWALDir;
+import static 
org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerSnapshotWALDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+@InterfaceAudience.Private
+public class SyncReplicationReplayWALManager {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SyncReplicationReplayWALManager.class);
+
+  private final MasterServices services;
+
+  private final FileSystem fs;
+
+  private final Path walRootDir;
+
+  private final Path remoteWALDir;
+
+  private final ZKSyncReplicationReplayWALWorkerStorage workerStorage;
+
+  private final Map<String, Set<ServerName>> workers = new HashMap<>();
+
+  private final Object workerLock = new Object();
+
+  public SyncReplicationReplayWALManager(MasterServices services)
+      throws IOException, ReplicationException {
+    this.services = services;
+    this.fs = services.getMasterFileSystem().getWALFileSystem();
+    this.walRootDir = services.getMasterFileSystem().getWALRootDir();
+    this.remoteWALDir = new Path(this.walRootDir, 
ReplicationUtils.REMOTE_WAL_DIR_NAME);
+    this.workerStorage = new 
ZKSyncReplicationReplayWALWorkerStorage(services.getZooKeeper(),
+        services.getConfiguration());
+    checkReplayingWALDir();
+  }
+
+  private void checkReplayingWALDir() throws IOException, ReplicationException 
{
+    FileStatus[] files = fs.listStatus(remoteWALDir);
+    for (FileStatus file : files) {
+      String name = file.getPath().getName();
+      if (name.endsWith(REMOTE_WAL_REPLAY_SUFFIX)) {
+        String peerId = name.substring(0, name.length() - 
REMOTE_WAL_REPLAY_SUFFIX.length());
+        workers.put(peerId, workerStorage.getPeerWorkers(peerId));
+      }
+    }
+  }
+
+  public void registerPeer(String peerId) throws ReplicationException {
+    workers.put(peerId, new HashSet<>());
+    workerStorage.addPeer(peerId);
+  }
+
+  public void unregisterPeer(String peerId) throws ReplicationException {
+    workers.remove(peerId);
+    workerStorage.removePeer(peerId);
+  }
+
+  public ServerName getPeerWorker(String peerId) throws ReplicationException {
+    Optional<ServerName> worker = Optional.empty();
+    ServerName workerServer = null;
+    synchronized (workerLock) {
+      worker = services.getServerManager().getOnlineServers().keySet().stream()
+          .filter(server -> !workers.get(peerId).contains(server)).findFirst();
+      if (worker.isPresent()) {
+        workerServer = worker.get();
+        workers.get(peerId).add(workerServer);
+      }
+    }
+    if (workerServer != null) {
+      workerStorage.addPeerWorker(peerId, workerServer);
+    }
+    return workerServer;
+  }
+
+  public void removePeerWorker(String peerId, ServerName worker) throws 
ReplicationException {
+    synchronized (workerLock) {
+      workers.get(peerId).remove(worker);
+    }
+    workerStorage.removePeerWorker(peerId, worker);
+  }
+  public void createPeerRemoteWALDir(String peerId) throws IOException {
+    Path peerRemoteWALDir = getPeerRemoteWALDir(remoteWALDir, peerId);
+    if (!fs.exists(peerRemoteWALDir) && !fs.mkdirs(peerRemoteWALDir)) {
+      throw new IOException("Unable to mkdir " + peerRemoteWALDir);
+    }
+  }
+
+  private void rename(Path src, Path dst, String peerId) throws IOException {
+    if (fs.exists(src)) {
+      deleteDir(dst, peerId);
+      if (!fs.rename(src, dst)) {
+        throw new IOException(
+            "Failed to rename dir from " + src + " to " + dst + " for peer 
id=" + peerId);
+      }
+      LOG.info("Renamed dir from {} to {} for peer id={}", src, dst, peerId);
+    } else if (!fs.exists(dst)) {
+      throw new IOException(
+          "Want to rename from " + src + " to " + dst + ", but they both do 
not exist");
+    }
+  }
+
+  public void renameToPeerReplayWALDir(String peerId) throws IOException {
+    rename(getPeerRemoteWALDir(remoteWALDir, peerId), 
getPeerReplayWALDir(remoteWALDir, peerId),
+        peerId);
+  }
+
+  public void renameToPeerSnapshotWALDir(String peerId) throws IOException {
+    rename(getPeerReplayWALDir(remoteWALDir, peerId), 
getPeerSnapshotWALDir(remoteWALDir, peerId),
+        peerId);
+  }
+
+  public List<Path> getReplayWALsAndCleanUpUnusedFiles(String peerId) throws 
IOException {
+    Path peerReplayWALDir = getPeerReplayWALDir(remoteWALDir, peerId);
+    for (FileStatus status : fs.listStatus(peerReplayWALDir,
+      p -> p.getName().endsWith(ReplicationUtils.RENAME_WAL_SUFFIX))) {
+      Path src = status.getPath();
+      String srcName = src.getName();
+      String dstName =
+          srcName.substring(0, srcName.length() - 
ReplicationUtils.RENAME_WAL_SUFFIX.length());
+      FSUtils.renameFile(fs, src, new Path(src.getParent(), dstName));
+    }
+    List<Path> wals = new ArrayList<>();
+    for (FileStatus status : fs.listStatus(peerReplayWALDir)) {
+      Path path = status.getPath();
+      if (path.getName().endsWith(ReplicationUtils.SYNC_WAL_SUFFIX)) {
+        wals.add(path);
+      } else {
+        if (!fs.delete(path, true)) {
+          LOG.warn("Can not delete unused file: " + path);
+        }
+      }
+    }
+    return wals;
+  }
+
+  public void snapshotPeerReplayWALDir(String peerId) throws IOException {
+    Path peerReplayWALDir = getPeerReplayWALDir(remoteWALDir, peerId);
+    if (fs.exists(peerReplayWALDir) && !fs.delete(peerReplayWALDir, true)) {
+      throw new IOException(
+          "Failed to remove replay wals dir " + peerReplayWALDir + " for peer 
id=" + peerId);
+    }
+  }
+
+  private void deleteDir(Path dir, String peerId) throws IOException {
+    if (!fs.delete(dir, true) && fs.exists(dir)) {
+      throw new IOException("Failed to remove dir " + dir + " for peer id=" + 
peerId);
+    }
+  }
+
+  public void removePeerRemoteWALs(String peerId) throws IOException {
+    deleteDir(getPeerRemoteWALDir(remoteWALDir, peerId), peerId);
+    deleteDir(getPeerReplayWALDir(remoteWALDir, peerId), peerId);
+    deleteDir(getPeerSnapshotWALDir(remoteWALDir, peerId), peerId);
+  }
+
+  public String removeWALRootPath(Path path) {
+    String pathStr = path.toString();
+    // remove the "/" too.
+    return pathStr.substring(walRootDir.toString().length() + 1);
+  }
+
+  public void finishReplayWAL(String wal) throws IOException {
+    Path walPath = new Path(walRootDir, wal);
+    fs.truncate(walPath, 0);
+  }
+
+  public boolean isReplayWALFinished(String wal) throws IOException {
+    Path walPath = new Path(walRootDir, wal);
+    return fs.getFileStatus(walPath).getLen() == 0;
+  }
+
+  @VisibleForTesting
+  public Path getRemoteWALDir() {
+    return remoteWALDir;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java
new file mode 100644
index 0000000..26d6a3f
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALState;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALStateData;
+
+@InterfaceAudience.Private
+public class SyncReplicationReplayWALProcedure
+    extends StateMachineProcedure<MasterProcedureEnv, 
SyncReplicationReplayWALState>
+    implements PeerProcedureInterface {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SyncReplicationReplayWALProcedure.class);
+
+  private String peerId;
+
+  private ServerName worker = null;
+
+  private List<String> wals;
+
+  public SyncReplicationReplayWALProcedure() {
+  }
+
+  public SyncReplicationReplayWALProcedure(String peerId, List<String> wals) {
+    this.peerId = peerId;
+    this.wals = wals;
+  }
+
+  @Override protected Flow executeFromState(MasterProcedureEnv env,
+      SyncReplicationReplayWALState state)
+      throws ProcedureSuspendedException, ProcedureYieldException, 
InterruptedException {
+    SyncReplicationReplayWALManager syncReplicationReplayWALManager =
+        env.getMasterServices().getSyncReplicationReplayWALManager();
+    switch (state) {
+      case ASSIGN_WORKER:
+        try {
+          worker = syncReplicationReplayWALManager.getPeerWorker(peerId);
+        } catch (ReplicationException e) {
+          LOG.info("Failed to get worker to replay wals {} for peer id={}, 
retry", wals, peerId);
+          throw new ProcedureYieldException();
+        }
+        if (worker == null) {
+          LOG.info("No worker to replay wals {} for peer id={}, retry", wals, 
peerId);
+          setNextState(SyncReplicationReplayWALState.ASSIGN_WORKER);
+        } else {
+          setNextState(SyncReplicationReplayWALState.DISPATCH_WALS_TO_WORKER);
+        }
+        return Flow.HAS_MORE_STATE;
+      case DISPATCH_WALS_TO_WORKER:
+        addChildProcedure(new SyncReplicationReplayWALRemoteProcedure(peerId, 
wals, worker));
+        setNextState(SyncReplicationReplayWALState.RELEASE_WORKER);
+        return Flow.HAS_MORE_STATE;
+      case RELEASE_WORKER:
+        boolean finished = false;
+        try {
+          finished = 
syncReplicationReplayWALManager.isReplayWALFinished(wals.get(0));
+        } catch (IOException e) {
+          LOG.info("Failed to check whether replay wals {} finished for peer 
id={}", wals, peerId);
+          throw new ProcedureYieldException();
+        }
+        try {
+          syncReplicationReplayWALManager.removePeerWorker(peerId, worker);
+        } catch (ReplicationException e) {
+          LOG.info("Failed to remove worker for peer id={}, retry", peerId);
+          throw new ProcedureYieldException();
+        }
+        if (!finished) {
+          LOG.info("Failed to replay wals {} for peer id={}, retry", wals, 
peerId);
+          setNextState(SyncReplicationReplayWALState.ASSIGN_WORKER);
+          return Flow.HAS_MORE_STATE;
+        }
+        return Flow.NO_MORE_STATE;
+      default:
+        throw new UnsupportedOperationException("unhandled state=" + state);
+    }
+  }
+
+  @Override
+  protected void rollbackState(MasterProcedureEnv env,
+      SyncReplicationReplayWALState state)
+      throws IOException, InterruptedException {
+    if (state == getInitialState()) {
+      return;
+    }
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected SyncReplicationReplayWALState getState(int state) {
+    return SyncReplicationReplayWALState.forNumber(state);
+  }
+
+  @Override
+  protected int getStateId(
+      SyncReplicationReplayWALState state) {
+    return state.getNumber();
+  }
+
+  @Override
+  protected SyncReplicationReplayWALState getInitialState() {
+    return SyncReplicationReplayWALState.ASSIGN_WORKER;
+  }
+
+  @Override
+  protected void serializeStateData(ProcedureStateSerializer serializer)
+      throws IOException {
+    SyncReplicationReplayWALStateData.Builder builder =
+        SyncReplicationReplayWALStateData.newBuilder();
+    builder.setPeerId(peerId);
+    wals.stream().forEach(builder::addWal);
+    serializer.serialize(builder.build());
+  }
+
+  @Override
+  protected void deserializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
+    SyncReplicationReplayWALStateData data =
+        serializer.deserialize(SyncReplicationReplayWALStateData.class);
+    peerId = data.getPeerId();
+    wals = new ArrayList<>();
+    data.getWalList().forEach(wals::add);
+  }
+
+  @Override
+  public String getPeerId() {
+    return peerId;
+  }
+
+  @Override
+  public PeerOperationType getPeerOperationType() {
+    return PeerOperationType.SYNC_REPLICATION_REPLAY_WAL;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java
new file mode 100644
index 0000000..9f4f330
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
+import 
org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation;
+import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import 
org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
+import 
org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
+import 
org.apache.hadoop.hbase.replication.regionserver.ReplaySyncReplicationWALCallable;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALParameter;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALRemoteStateData;
+
+@InterfaceAudience.Private
+public class SyncReplicationReplayWALRemoteProcedure extends 
Procedure<MasterProcedureEnv>
+    implements RemoteProcedure<MasterProcedureEnv, ServerName>, 
PeerProcedureInterface {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SyncReplicationReplayWALRemoteProcedure.class);
+
+  private String peerId;
+
+  private ServerName targetServer;
+
+  private List<String> wals;
+
+  private boolean dispatched;
+
+  private ProcedureEvent<?> event;
+
+  private boolean succ;
+
+  public SyncReplicationReplayWALRemoteProcedure() {
+  }
+
+  public SyncReplicationReplayWALRemoteProcedure(String peerId, List<String> 
wals,
+      ServerName targetServer) {
+    this.peerId = peerId;
+    this.wals = wals;
+    this.targetServer = targetServer;
+  }
+
+  @Override
+  public RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName 
remote) {
+    ReplaySyncReplicationWALParameter.Builder builder =
+        ReplaySyncReplicationWALParameter.newBuilder();
+    builder.setPeerId(peerId);
+    wals.stream().forEach(builder::addWal);
+    return new ServerOperation(this, getProcId(), 
ReplaySyncReplicationWALCallable.class,
+        builder.build().toByteArray());
+  }
+
+  @Override
+  public void remoteCallFailed(MasterProcedureEnv env, ServerName remote, 
IOException exception) {
+    complete(env, exception);
+  }
+
+  @Override
+  public void remoteOperationCompleted(MasterProcedureEnv env) {
+    complete(env, null);
+  }
+
+  @Override
+  public void remoteOperationFailed(MasterProcedureEnv env, 
RemoteProcedureException error) {
+    complete(env, error);
+  }
+
+  private void complete(MasterProcedureEnv env, Throwable error) {
+    if (event == null) {
+      LOG.warn("procedure event for {} is null, maybe the procedure is created 
when recovery",
+        getProcId());
+      return;
+    }
+    if (error != null) {
+      LOG.warn("Replay wals {} on {} failed for peer id={}", wals, 
targetServer, peerId, error);
+      this.succ = false;
+    } else {
+      truncateWALs(env);
+      LOG.info("Replay wals {} on {} succeed for peer id={}", wals, 
targetServer, peerId);
+      this.succ = true;
+    }
+    event.wake(env.getProcedureScheduler());
+    event = null;
+  }
+
+  /**
+   * Only truncate wals one by one when task succeed. The parent procedure 
will check the first
+   * wal length to know whether this task succeed.
+   */
+  private void truncateWALs(MasterProcedureEnv env) {
+    String firstWal = wals.get(0);
+    try {
+      
env.getMasterServices().getSyncReplicationReplayWALManager().finishReplayWAL(firstWal);
+    } catch (IOException e) {
+      // As it is idempotent to rerun this task. Just ignore this exception 
and return.
+      LOG.warn("Failed to truncate wal {} for peer id={}", firstWal, peerId, 
e);
+      return;
+    }
+    for (int i = 1; i < wals.size(); i++) {
+      String wal = wals.get(i);
+      try {
+        
env.getMasterServices().getSyncReplicationReplayWALManager().finishReplayWAL(wal);
+      } catch (IOException e1) {
+        try {
+          // retry
+          
env.getMasterServices().getSyncReplicationReplayWALManager().finishReplayWAL(wal);
+        } catch (IOException e2) {
+          // As the parent procedure only check the first wal length. Just 
ignore this exception.
+          LOG.warn("Failed to truncate wal {} for peer id={}", wal, peerId, 
e2);
+        }
+      }
+    }
+  }
+
+  @Override
+  protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
+      throws ProcedureYieldException, ProcedureSuspendedException, 
InterruptedException {
+    if (dispatched) {
+      if (succ) {
+        return null;
+      }
+      // retry
+      dispatched = false;
+    }
+
+    // Dispatch task to target server
+    try {
+      env.getRemoteDispatcher().addOperationToNode(targetServer, this);
+    } catch (FailedRemoteDispatchException e) {
+      LOG.warn(
+          "Can not add remote operation for replay wals {} on {} for peer 
id={}, "
+              + "this usually because the server is already dead, retry",
+          wals, targetServer, peerId);
+      throw new ProcedureYieldException();
+    }
+    dispatched = true;
+    event = new ProcedureEvent<>(this);
+    event.suspendIfNotReady(this);
+    throw new ProcedureSuspendedException();
+  }
+
+  @Override
+  protected void rollback(MasterProcedureEnv env) throws IOException, 
InterruptedException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected boolean abort(MasterProcedureEnv env) {
+    return false;
+  }
+
+  @Override
+  protected void serializeStateData(ProcedureStateSerializer serializer)
+      throws IOException {
+    SyncReplicationReplayWALRemoteStateData.Builder builder =
+        SyncReplicationReplayWALRemoteStateData.newBuilder().setPeerId(peerId)
+            .setTargetServer(ProtobufUtil.toServerName(targetServer));
+    wals.stream().forEach(builder::addWal);
+    serializer.serialize(builder.build());
+  }
+
+  @Override
+  protected void deserializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
+    SyncReplicationReplayWALRemoteStateData data =
+        serializer.deserialize(SyncReplicationReplayWALRemoteStateData.class);
+    peerId = data.getPeerId();
+    wals = new ArrayList<>();
+    data.getWalList().forEach(wals::add);
+    targetServer = ProtobufUtil.toServerName(data.getTargetServer());
+  }
+
+  @Override
+  public String getPeerId() {
+    return peerId;
+  }
+
+  @Override
+  public PeerOperationType getPeerOperationType() {
+    return PeerOperationType.SYNC_REPLICATION_REPLAY_WAL_REMOTE;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
index 66f67dd..c650974 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
@@ -186,8 +186,8 @@ public class TransitPeerSyncReplicationStateProcedure
     }
   }
 
-  private void replayRemoteWAL() {
-    addChildProcedure(new RecoverStandbyProcedure(peerId));
+  private void replayRemoteWAL(boolean serial) {
+    addChildProcedure(new RecoverStandbyProcedure(peerId, serial));
   }
 
   @Override
@@ -232,7 +232,7 @@ public class TransitPeerSyncReplicationStateProcedure
         setNextStateAfterRefreshBegin();
         return Flow.HAS_MORE_STATE;
       case REPLAY_REMOTE_WAL_IN_PEER:
-        replayRemoteWAL();
+        
replayRemoteWAL(env.getReplicationPeerManager().getPeerConfig(peerId).get().isSerial());
         setNextState(
           
PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
         return Flow.HAS_MORE_STATE;

http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ZKSyncReplicationReplayWALWorkerStorage.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ZKSyncReplicationReplayWALWorkerStorage.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ZKSyncReplicationReplayWALWorkerStorage.java
new file mode 100644
index 0000000..5991cf0
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ZKSyncReplicationReplayWALWorkerStorage.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ZKReplicationStorageBase;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+
+@InterfaceAudience.Private
+public class ZKSyncReplicationReplayWALWorkerStorage extends 
ZKReplicationStorageBase {
+
+  public static final String WORKERS_ZNODE = 
"zookeeper.znode.sync.replication.replaywal.workers";
+
+  public static final String WORKERS_ZNODE_DEFAULT = "replaywal-workers";
+
+  /**
+   * The name of the znode that contains a list of workers to replay wal.
+   */
+  private final String workersZNode;
+
+  public ZKSyncReplicationReplayWALWorkerStorage(ZKWatcher zookeeper, 
Configuration conf) {
+    super(zookeeper, conf);
+    String workersZNodeName = conf.get(WORKERS_ZNODE, WORKERS_ZNODE_DEFAULT);
+    workersZNode = ZNodePaths.joinZNode(replicationZNode, workersZNodeName);
+  }
+
+  private String getPeerNode(String peerId) {
+    return ZNodePaths.joinZNode(workersZNode, peerId);
+  }
+
+  public void addPeer(String peerId) throws ReplicationException {
+    try {
+      ZKUtil.createWithParents(zookeeper, getPeerNode(peerId));
+    } catch (KeeperException e) {
+      throw new ReplicationException(
+          "Failed to add peer id=" + peerId + " to replaywal-workers storage", 
e);
+    }
+  }
+
+  public void removePeer(String peerId) throws ReplicationException {
+    try {
+      ZKUtil.deleteNodeRecursively(zookeeper, getPeerNode(peerId));
+    } catch (KeeperException e) {
+      throw new ReplicationException(
+          "Failed to remove peer id=" + peerId + " to replaywal-workers 
storage", e);
+    }
+  }
+
+  private String getPeerWorkerNode(String peerId, ServerName worker) {
+    return ZNodePaths.joinZNode(getPeerNode(peerId), worker.getServerName());
+  }
+
+  public void addPeerWorker(String peerId, ServerName worker) throws 
ReplicationException {
+    try {
+      ZKUtil.createWithParents(zookeeper, getPeerWorkerNode(peerId, worker));
+    } catch (KeeperException e) {
+      throw new ReplicationException("Failed to add worker=" + worker + " for 
peer id=" + peerId,
+          e);
+    }
+  }
+
+  public void removePeerWorker(String peerId, ServerName worker) throws 
ReplicationException {
+    try {
+      ZKUtil.deleteNode(zookeeper, getPeerWorkerNode(peerId, worker));
+    } catch (KeeperException e) {
+      throw new ReplicationException("Failed to remove worker=" + worker + " 
for peer id=" + peerId,
+          e);
+    }
+  }
+
+  public Set<ServerName> getPeerWorkers(String peerId) throws 
ReplicationException {
+    try {
+      List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, 
getPeerNode(peerId));
+      if (children == null) {
+        return new HashSet<>();
+      }
+      return 
children.stream().map(ServerName::valueOf).collect(Collectors.toSet());
+    } catch (KeeperException e) {
+      throw new ReplicationException("Failed to list workers for peer id=" + 
peerId, e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
index 3cf065c..24963f1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
@@ -21,6 +21,8 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.locks.Lock;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -32,6 +34,7 @@ import 
org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.KeyLocker;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WAL.Reader;
@@ -68,31 +71,28 @@ public class ReplaySyncReplicationWALCallable implements 
RSProcedureCallable {
 
   private String peerId;
 
-  private String wal;
+  private List<String> wals = new ArrayList<>();
 
   private Exception initError;
 
   private long batchSize;
 
+  private final KeyLocker<String> peersLock = new KeyLocker<>();
+
   @Override
   public Void call() throws Exception {
     if (initError != null) {
       throw initError;
     }
-    LOG.info("Received a replay sync replication wal {} event, peerId={}", 
wal, peerId);
+    LOG.info("Received a replay sync replication wals {} event, peerId={}", 
wals, peerId);
     if (rs.getReplicationSinkService() != null) {
-      try (Reader reader = getReader()) {
-        List<Entry> entries = readWALEntries(reader);
-        while (!entries.isEmpty()) {
-          Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> pair = 
ReplicationProtbufUtil
-              .buildReplicateWALEntryRequest(entries.toArray(new 
Entry[entries.size()]));
-          ReplicateWALEntryRequest request = pair.getFirst();
-          
rs.getReplicationSinkService().replicateLogEntries(request.getEntryList(),
-            pair.getSecond(), request.getReplicationClusterId(),
-            request.getSourceBaseNamespaceDirPath(), 
request.getSourceHFileArchiveDirPath());
-          // Read next entries.
-          entries = readWALEntries(reader);
+      Lock peerLock = peersLock.acquireLock(wals.get(0));
+      try {
+        for (String wal : wals) {
+          replayWAL(wal);
         }
+      } finally {
+        peerLock.unlock();
       }
     }
     return null;
@@ -107,7 +107,7 @@ public class ReplaySyncReplicationWALCallable implements 
RSProcedureCallable {
       ReplaySyncReplicationWALParameter param =
           ReplaySyncReplicationWALParameter.parseFrom(parameter);
       this.peerId = param.getPeerId();
-      this.wal = param.getWal();
+      param.getWalList().forEach(this.wals::add);
       this.batchSize = 
rs.getConfiguration().getLong(REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE,
         DEFAULT_REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE);
     } catch (InvalidProtocolBufferException e) {
@@ -120,7 +120,23 @@ public class ReplaySyncReplicationWALCallable implements 
RSProcedureCallable {
     return EventType.RS_REPLAY_SYNC_REPLICATION_WAL;
   }
 
-  private Reader getReader() throws IOException {
+  private void replayWAL(String wal) throws IOException {
+    try (Reader reader = getReader(wal)) {
+      List<Entry> entries = readWALEntries(reader);
+      while (!entries.isEmpty()) {
+        Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> pair = 
ReplicationProtbufUtil
+            .buildReplicateWALEntryRequest(entries.toArray(new 
Entry[entries.size()]));
+        ReplicateWALEntryRequest request = pair.getFirst();
+        
rs.getReplicationSinkService().replicateLogEntries(request.getEntryList(),
+            pair.getSecond(), request.getReplicationClusterId(),
+            request.getSourceBaseNamespaceDirPath(), 
request.getSourceHFileArchiveDirPath());
+        // Read next entries.
+        entries = readWALEntries(reader);
+      }
+    }
+  }
+
+  private Reader getReader(String wal) throws IOException {
     Path path = new Path(rs.getWALRootDir(), wal);
     long length = rs.getWALFileSystem().getFileStatus(path).getLen();
     try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
index 48d47ea..ac20dbd 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
@@ -41,8 +41,8 @@ import 
org.apache.hadoop.hbase.master.assignment.AssignmentManager;
 import org.apache.hadoop.hbase.master.locking.LockManager;
 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import 
org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager;
 import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
+import 
org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager;
 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
 import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
 import org.apache.hadoop.hbase.procedure2.LockedResource;
@@ -476,7 +476,7 @@ public class MockNoopMasterServices implements 
MasterServices {
   }
 
   @Override
-  public ReplaySyncReplicationWALManager getReplaySyncReplicationWALManager() {
+  public SyncReplicationReplayWALManager getSyncReplicationReplayWALManager() {
     return null;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
index a20edd3..f765139 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
@@ -78,7 +78,7 @@ public class SyncReplicationTestBase {
 
   protected static Path REMOTE_WAL_DIR2;
 
-  private static void initTestingUtility(HBaseTestingUtility util, String 
zkParent) {
+  protected static void initTestingUtility(HBaseTestingUtility util, String 
zkParent) {
     util.setZkCluster(ZK_UTIL.getZkCluster());
     Configuration conf = util.getConfiguration();
     conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkParent);
@@ -102,8 +102,8 @@ public class SyncReplicationTestBase {
     ZK_UTIL.startMiniZKCluster();
     initTestingUtility(UTIL1, "/cluster1");
     initTestingUtility(UTIL2, "/cluster2");
-    UTIL1.startMiniCluster(3);
-    UTIL2.startMiniCluster(3);
+    UTIL1.startMiniCluster(2,3);
+    UTIL2.startMiniCluster(2,3);
     TableDescriptor td =
       
TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder
         
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build();

http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillMaster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillMaster.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillMaster.java
new file mode 100644
index 0000000..6265f5c
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillMaster.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestSyncReplicationStandbyKillMaster extends 
SyncReplicationTestBase {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestSyncReplicationStandbyKillMaster.class);
+
+  private final long SLEEP_TIME = 2000;
+
+  private final int COUNT = 1000;
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestSyncReplicationStandbyKillMaster.class);
+
+  @Test
+  public void testStandbyKillMaster() throws Exception {
+    MasterFileSystem mfs = 
UTIL2.getHBaseCluster().getMaster().getMasterFileSystem();
+    Path remoteWALDir = getRemoteWALDir(mfs, PEER_ID);
+    assertFalse(mfs.getWALFileSystem().exists(remoteWALDir));
+    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+        SyncReplicationState.STANDBY);
+    assertTrue(mfs.getWALFileSystem().exists(remoteWALDir));
+    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+        SyncReplicationState.ACTIVE);
+
+    // Disable async replication and write data, then shutdown
+    UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
+    write(UTIL1, 0, COUNT);
+    UTIL1.shutdownMiniCluster();
+
+    Thread t = new Thread(() -> {
+      try {
+        Thread.sleep(SLEEP_TIME);
+        UTIL2.getMiniHBaseCluster().getMaster().stop("Stop master for test");
+      } catch (Exception e) {
+        LOG.error("Failed to stop master", e);
+      }
+    });
+    t.start();
+
+    // Transit standby to DA to replay logs
+    try {
+      UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+          SyncReplicationState.DOWNGRADE_ACTIVE);
+    } catch (Exception e) {
+      LOG.error("Failed to transit standby cluster to " + 
SyncReplicationState.DOWNGRADE_ACTIVE);
+    }
+
+    while (UTIL2.getAdmin().getReplicationPeerSyncReplicationState(PEER_ID)
+        != SyncReplicationState.DOWNGRADE_ACTIVE) {
+      Thread.sleep(SLEEP_TIME);
+    }
+    verify(UTIL2, 0, COUNT);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillRS.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillRS.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillRS.java
new file mode 100644
index 0000000..3c9724f
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillRS.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestSyncReplicationStandbyKillRS extends SyncReplicationTestBase {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestSyncReplicationStandbyKillRS.class);
+
+  private final long SLEEP_TIME = 1000;
+
+  private final int COUNT = 1000;
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestSyncReplicationStandbyKillRS.class);
+
+  @Test
+  public void testStandbyKillRegionServer() throws Exception {
+    MasterFileSystem mfs = 
UTIL2.getHBaseCluster().getMaster().getMasterFileSystem();
+    Path remoteWALDir = getRemoteWALDir(mfs, PEER_ID);
+    assertFalse(mfs.getWALFileSystem().exists(remoteWALDir));
+    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+        SyncReplicationState.STANDBY);
+    assertTrue(mfs.getWALFileSystem().exists(remoteWALDir));
+    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+        SyncReplicationState.ACTIVE);
+
+    // Disable async replication and write data, then shutdown
+    UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
+    write(UTIL1, 0, COUNT);
+    UTIL1.shutdownMiniCluster();
+
+    JVMClusterUtil.MasterThread activeMaster = 
UTIL2.getMiniHBaseCluster().getMasterThread();
+    Thread t = new Thread(() -> {
+      try {
+        List<JVMClusterUtil.RegionServerThread> regionServers =
+            UTIL2.getMiniHBaseCluster().getLiveRegionServerThreads();
+        for (JVMClusterUtil.RegionServerThread rst : regionServers) {
+          ServerName serverName = rst.getRegionServer().getServerName();
+          rst.getRegionServer().stop("Stop RS for test");
+          waitForRSShutdownToStartAndFinish(activeMaster, serverName);
+          JVMClusterUtil.RegionServerThread restarted =
+              UTIL2.getMiniHBaseCluster().startRegionServer();
+          restarted.waitForServerOnline();
+        }
+      } catch (Exception e) {
+        LOG.error("Failed to kill RS", e);
+      }
+    });
+    t.start();
+
+    // Transit standby to DA to replay logs
+    try {
+      UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+          SyncReplicationState.DOWNGRADE_ACTIVE);
+    } catch (Exception e) {
+      LOG.error("Failed to transit standby cluster to " + 
SyncReplicationState.DOWNGRADE_ACTIVE);
+    }
+
+    while (UTIL2.getAdmin().getReplicationPeerSyncReplicationState(PEER_ID)
+        != SyncReplicationState.DOWNGRADE_ACTIVE) {
+      Thread.sleep(SLEEP_TIME);
+    }
+    verify(UTIL2, 0, COUNT);
+  }
+
+  private void waitForRSShutdownToStartAndFinish(JVMClusterUtil.MasterThread 
activeMaster,
+      ServerName serverName) throws InterruptedException {
+    ServerManager sm = activeMaster.getMaster().getServerManager();
+    // First wait for it to be in dead list
+    while (!sm.getDeadServers().isDeadServer(serverName)) {
+      LOG.debug("Waiting for [" + serverName + "] to be listed as dead in 
master");
+      Thread.sleep(SLEEP_TIME);
+    }
+    LOG.debug("Server [" + serverName + "] marked as dead, waiting for it to " 
+
+        "finish dead processing");
+    while (sm.areDeadServersInProgress()) {
+      LOG.debug("Server [" + serverName + "] still being processed, waiting");
+      Thread.sleep(SLEEP_TIME);
+    }
+    LOG.debug("Server [" + serverName + "] done with server shutdown 
processing");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java
index 2563669..d01a0ac 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.replication.RecoverStandbyProcedure;
-import 
org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager;
+import 
org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager;
 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
@@ -92,7 +92,7 @@ public class TestRecoverStandbyProcedure {
 
   private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
 
-  private static ReplaySyncReplicationWALManager 
replaySyncReplicationWALManager;
+  private static SyncReplicationReplayWALManager 
syncReplicationReplayWALManager;
 
   private static ProcedureExecutor<MasterProcedureEnv> procExec;
 
@@ -107,7 +107,7 @@ public class TestRecoverStandbyProcedure {
     conf = UTIL.getConfiguration();
     HMaster master = UTIL.getHBaseCluster().getMaster();
     fs = master.getMasterFileSystem().getWALFileSystem();
-    replaySyncReplicationWALManager = 
master.getReplaySyncReplicationWALManager();
+    syncReplicationReplayWALManager = 
master.getSyncReplicationReplayWALManager();
     procExec = master.getMasterProcedureExecutor();
   }
 
@@ -138,7 +138,7 @@ public class TestRecoverStandbyProcedure {
   @Test
   public void testRecoverStandby() throws IOException, 
StreamLacksCapabilityException {
     setupSyncReplicationWALs();
-    long procId = procExec.submitProcedure(new 
RecoverStandbyProcedure(PEER_ID));
+    long procId = procExec.submitProcedure(new 
RecoverStandbyProcedure(PEER_ID, false));
     ProcedureTestingUtility.waitProcedure(procExec, procId);
     ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
 
@@ -153,7 +153,7 @@ public class TestRecoverStandbyProcedure {
 
   private void setupSyncReplicationWALs() throws IOException, 
StreamLacksCapabilityException {
     Path peerRemoteWALDir = ReplicationUtils
-      .getPeerRemoteWALDir(replaySyncReplicationWALManager.getRemoteWALDir(), 
PEER_ID);
+      .getPeerRemoteWALDir(syncReplicationReplayWALManager.getRemoteWALDir(), 
PEER_ID);
     if (!fs.exists(peerRemoteWALDir)) {
       fs.mkdirs(peerRemoteWALDir);
     }

Reply via email to