This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 5f291e82436c767a21eb16a08a57ce6c49395302
Author: Duo Zhang <zhang...@apache.org>
AuthorDate: Fri Jan 11 16:22:24 2019 +0800

    HBASE-21671 Rewrite RegionReplicaReplicationEndpoint to use 
AsyncClusterConnection
---
 .../hadoop/hbase/client/AsyncConnectionImpl.java   |  24 +-
 .../hbase/client/AsyncClusterConnection.java       |  17 +
 .../hbase/client/AsyncClusterConnectionImpl.java   |  80 +++
 .../AsyncRegionReplicaReplayRetryingCaller.java    | 146 ++++
 .../hbase/client/AsyncRegionServerAdmin.java       |   5 +-
 .../hbase/client/ClusterConnectionFactory.java     |   2 +-
 .../hbase/protobuf/ReplicationProtbufUtil.java     |  31 +-
 .../handler/RegionReplicaFlushHandler.java         |   3 +-
 .../hbase/replication/ReplicationEndpoint.java     |  35 +-
 .../RegionReplicaReplicationEndpoint.java          | 782 +++++++--------------
 .../regionserver/ReplicationSource.java            |   2 +-
 .../hbase/client/TestAsyncTableNoncedRetry.java    |   2 +-
 .../TestRegionReplicaReplicationEndpoint.java      |  56 +-
 ...stRegionReplicaReplicationEndpointNoMaster.java |  99 ++-
 14 files changed, 627 insertions(+), 657 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index be73dd8..9adad74 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -55,7 +55,6 @@ import 
org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
 
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningResponse;
@@ -65,7 +64,7 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterServ
  * The implementation of AsyncConnection.
  */
 @InterfaceAudience.Private
-class AsyncConnectionImpl implements AsyncClusterConnection {
+class AsyncConnectionImpl implements AsyncConnection {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(AsyncConnectionImpl.class);
 
@@ -85,7 +84,7 @@ class AsyncConnectionImpl implements AsyncClusterConnection {
 
   private final int rpcTimeout;
 
-  private final RpcClient rpcClient;
+  protected final RpcClient rpcClient;
 
   final RpcControllerFactory rpcControllerFactory;
 
@@ -160,16 +159,10 @@ class AsyncConnectionImpl implements 
AsyncClusterConnection {
   }
 
   // ditto
-  @Override
-  public NonceGenerator getNonceGenerator() {
+  NonceGenerator getNonceGenerator() {
     return nonceGenerator;
   }
 
-  @Override
-  public RpcClient getRpcClient() {
-    return rpcClient;
-  }
-
   private ClientService.Interface createRegionServerStub(ServerName 
serverName) throws IOException {
     return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, 
rpcTimeout));
   }
@@ -360,15 +353,4 @@ class AsyncConnectionImpl implements 
AsyncClusterConnection {
     return new HBaseHbck(MasterProtos.HbckService.newBlockingStub(
       rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), 
rpcControllerFactory);
   }
-
-  public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) {
-    return new AsyncRegionServerAdmin(serverName, this);
-  }
-
-  @Override
-  public CompletableFuture<FlushRegionResponse> flush(byte[] regionName,
-      boolean writeFlushWALMarker) {
-    RawAsyncHBaseAdmin admin = (RawAsyncHBaseAdmin) getAdmin();
-    return admin.flushRegionInternal(regionName, writeFlushWALMarker);
-  }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
similarity index 72%
rename from 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
rename to 
hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
index f1f64ca..0ad77ba 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -17,9 +17,13 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.yetus.audience.InterfaceAudience;
 
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
@@ -49,4 +53,17 @@ public interface AsyncClusterConnection extends 
AsyncConnection {
    * Flush a region and get the response.
    */
   CompletableFuture<FlushRegionResponse> flush(byte[] regionName, boolean 
writeFlushWALMarker);
+
+  /**
+   * Replicate wal edits for replica regions. The return value is the edits we 
skipped, as the
+   * original return value is useless.
+   */
+  CompletableFuture<Long> replay(TableName tableName, byte[] 
encodedRegionName, byte[] row,
+      List<Entry> entries, int replicaId, int numRetries, long 
operationTimeoutNs);
+
+  /**
+   * Return all the replicas for a region. Used for regiong replica 
replication.
+   */
+  CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, 
byte[] row,
+      boolean reload);
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
new file mode 100644
index 0000000..d61f01f
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
+
+/**
+ * The implementation of AsyncClusterConnection.
+ */
+@InterfaceAudience.Private
+class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements 
AsyncClusterConnection {
+
+  public AsyncClusterConnectionImpl(Configuration conf, AsyncRegistry 
registry, String clusterId,
+      SocketAddress localAddress, User user) {
+    super(conf, registry, clusterId, localAddress, user);
+  }
+
+  @Override
+  public NonceGenerator getNonceGenerator() {
+    return super.getNonceGenerator();
+  }
+
+  @Override
+  public RpcClient getRpcClient() {
+    return rpcClient;
+  }
+
+  @Override
+  public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) {
+    return new AsyncRegionServerAdmin(serverName, this);
+  }
+
+  @Override
+  public CompletableFuture<FlushRegionResponse> flush(byte[] regionName,
+      boolean writeFlushWALMarker) {
+    RawAsyncHBaseAdmin admin = (RawAsyncHBaseAdmin) getAdmin();
+    return admin.flushRegionInternal(regionName, writeFlushWALMarker);
+  }
+
+  @Override
+  public CompletableFuture<Long> replay(TableName tableName, byte[] 
encodedRegionName, byte[] row,
+      List<Entry> entries, int replicaId, int retries, long 
operationTimeoutNs) {
+    return new AsyncRegionReplicaReplayRetryingCaller(RETRY_TIMER, this,
+      ConnectionUtils.retries2Attempts(retries), operationTimeoutNs, 
tableName, encodedRegionName,
+      row, entries, replicaId).call();
+  }
+
+  @Override
+  public CompletableFuture<RegionLocations> getRegionLocations(TableName 
tableName, byte[] row,
+      boolean reload) {
+    return getLocator().getRegionLocations(tableName, row, 
RegionLocateType.CURRENT, reload, -1L);
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicaReplayRetryingCaller.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicaReplayRetryingCaller.java
new file mode 100644
index 0000000..3364f15
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicaReplayRetryingCaller.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
+
+/**
+ * For replaying edits for region replica.
+ * <p/>
+ * The mainly difference here is that, every time after locating, we will 
check whether the region
+ * name is equal, if not, we will give up, as this usually means the region 
has been split or
+ * merged, and the new region(s) should already have all the data of the 
parent region(s).
+ * <p/>
+ * Notice that, the return value is the edits we skipped, as the original 
response message is not
+ * used at upper layer.
+ */
+@InterfaceAudience.Private
+public class AsyncRegionReplicaReplayRetryingCaller extends 
AsyncRpcRetryingCaller<Long> {
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(AsyncRegionReplicaReplayRetryingCaller.class);
+
+  private final TableName tableName;
+
+  private final byte[] encodedRegionName;
+
+  private final byte[] row;
+
+  private final Entry[] entries;
+
+  private final int replicaId;
+
+  public AsyncRegionReplicaReplayRetryingCaller(HashedWheelTimer retryTimer,
+      AsyncClusterConnectionImpl conn, int maxAttempts, long 
operationTimeoutNs,
+      TableName tableName, byte[] encodedRegionName, byte[] row, List<Entry> 
entries,
+      int replicaId) {
+    super(retryTimer, conn, conn.connConf.getPauseNs(), maxAttempts, 
operationTimeoutNs,
+      conn.connConf.getWriteRpcTimeoutNs(), 
conn.connConf.getStartLogErrorsCnt());
+    this.tableName = tableName;
+    this.encodedRegionName = encodedRegionName;
+    this.row = row;
+    this.entries = entries.toArray(new Entry[0]);
+    this.replicaId = replicaId;
+  }
+
+  private void call(HRegionLocation loc) {
+    if (!Bytes.equals(encodedRegionName, 
loc.getRegion().getEncodedNameAsBytes())) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(
+          "Skipping {} entries in table {} because located region {} is 
different than" +
+            " the original region {} from WALEdit",
+          entries.length, tableName, loc.getRegion().getEncodedName(),
+          Bytes.toStringBinary(encodedRegionName));
+        for (Entry entry : entries) {
+          LOG.trace("Skipping : " + entry);
+        }
+      }
+      future.complete(Long.valueOf(entries.length));
+      return;
+    }
+
+    AdminService.Interface stub;
+    try {
+      stub = conn.getAdminStub(loc.getServerName());
+    } catch (IOException e) {
+      onError(e,
+        () -> "Get async admin stub to " + loc.getServerName() + " for '" +
+          Bytes.toStringBinary(row) + "' in " + 
loc.getRegion().getEncodedName() + " of " +
+          tableName + " failed",
+        err -> conn.getLocator().updateCachedLocationOnError(loc, err));
+      return;
+    }
+    Pair<ReplicateWALEntryRequest, CellScanner> p = ReplicationProtbufUtil
+      .buildReplicateWALEntryRequest(entries, encodedRegionName, null, null, 
null);
+    resetCallTimeout();
+    controller.setCellScanner(p.getSecond());
+    stub.replay(controller, p.getFirst(), r -> {
+      if (controller.failed()) {
+        onError(controller.getFailed(),
+          () -> "Call to " + loc.getServerName() + " for '" + 
Bytes.toStringBinary(row) + "' in " +
+            loc.getRegion().getEncodedName() + " of " + tableName + " failed",
+          err -> conn.getLocator().updateCachedLocationOnError(loc, err));
+      } else {
+        future.complete(0L);
+      }
+    });
+
+  }
+
+  @Override
+  protected void doCall() {
+    long locateTimeoutNs;
+    if (operationTimeoutNs > 0) {
+      locateTimeoutNs = remainingTimeNs();
+      if (locateTimeoutNs <= 0) {
+        completeExceptionally();
+        return;
+      }
+    } else {
+      locateTimeoutNs = -1L;
+    }
+    addListener(conn.getLocator().getRegionLocation(tableName, row, replicaId,
+      RegionLocateType.CURRENT, locateTimeoutNs), (loc, error) -> {
+        if (error != null) {
+          onError(error,
+            () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName 
+ " failed", err -> {
+            });
+          return;
+        }
+        call(loc);
+      });
+  }
+}
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
similarity index 99%
rename from 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
rename to 
hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
index b9141a9..d491890 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
@@ -164,8 +164,9 @@ public class AsyncRegionServerAdmin {
       cellScanner);
   }
 
-  public CompletableFuture<ReplicateWALEntryResponse> 
replay(ReplicateWALEntryRequest request) {
-    return call((stub, controller, done) -> stub.replay(controller, request, 
done));
+  public CompletableFuture<ReplicateWALEntryResponse> 
replay(ReplicateWALEntryRequest request,
+      CellScanner cellScanner) {
+    return call((stub, controller, done) -> stub.replay(controller, request, 
done), cellScanner);
   }
 
   public CompletableFuture<RollWALWriterResponse> 
rollWALWriter(RollWALWriterRequest request) {
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
similarity index 95%
rename from 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
rename to 
hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
index 79484db..2670420 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
@@ -46,6 +46,6 @@ public final class ClusterConnectionFactory {
       SocketAddress localAddress, User user) throws IOException {
     AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf);
     String clusterId = FutureUtils.get(registry.getClusterId());
-    return new AsyncConnectionImpl(conf, registry, clusterId, localAddress, 
user);
+    return new AsyncClusterConnectionImpl(conf, registry, clusterId, 
localAddress, user);
   }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index 9f41a76..c39c86c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -37,7 +37,8 @@ import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
 
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 
 @InterfaceAudience.Private
@@ -55,20 +56,18 @@ public class ReplicationProtbufUtil {
   public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[] 
entries,
       String replicationClusterId, Path sourceBaseNamespaceDir, Path 
sourceHFileArchiveDir)
       throws IOException {
-    Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = 
buildReplicateWALEntryRequest(
-      entries, null, replicationClusterId, sourceBaseNamespaceDir, 
sourceHFileArchiveDir);
+    Pair<ReplicateWALEntryRequest, CellScanner> p = 
buildReplicateWALEntryRequest(entries, null,
+      replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
     FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond()));
   }
 
   /**
    * Create a new ReplicateWALEntryRequest from a list of WAL entries
-   *
    * @param entries the WAL entries to be replicated
-   * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the 
WALEdit values
-   * found.
+   * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the 
WALEdit values found.
    */
-  public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
-      buildReplicateWALEntryRequest(final Entry[] entries) throws IOException {
+  public static Pair<ReplicateWALEntryRequest, CellScanner> 
buildReplicateWALEntryRequest(
+      final Entry[] entries) {
     return buildReplicateWALEntryRequest(entries, null, null, null, null);
   }
 
@@ -82,16 +81,14 @@ public class ReplicationProtbufUtil {
    * @param sourceHFileArchiveDir Path to the source cluster hfile archive 
directory
    * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the 
WALEdit values found.
    */
-  public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
-      buildReplicateWALEntryRequest(final Entry[] entries, byte[] 
encodedRegionName,
-          String replicationClusterId, Path sourceBaseNamespaceDir, Path 
sourceHFileArchiveDir)
-          throws IOException {
+  public static Pair<ReplicateWALEntryRequest, CellScanner> 
buildReplicateWALEntryRequest(
+      final Entry[] entries, byte[] encodedRegionName, String 
replicationClusterId,
+      Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir) {
     // Accumulate all the Cells seen in here.
     List<List<? extends Cell>> allCells = new ArrayList<>(entries.length);
     int size = 0;
-    AdminProtos.WALEntry.Builder entryBuilder = 
AdminProtos.WALEntry.newBuilder();
-    AdminProtos.ReplicateWALEntryRequest.Builder builder =
-      AdminProtos.ReplicateWALEntryRequest.newBuilder();
+    WALEntry.Builder entryBuilder = WALEntry.newBuilder();
+    ReplicateWALEntryRequest.Builder builder = 
ReplicateWALEntryRequest.newBuilder();
 
     for (Entry entry: entries) {
       entryBuilder.clear();
@@ -99,8 +96,8 @@ public class ReplicationProtbufUtil {
       try {
         keyBuilder = 
entry.getKey().getBuilder(WALCellCodec.getNoneCompressor());
       } catch (IOException e) {
-        throw new IOException(
-            "There should not throw exception since NoneCompressor do not 
throw any exceptions", e);
+        throw new AssertionError(
+          "There should not throw exception since NoneCompressor do not throw 
any exceptions", e);
       }
       if(encodedRegionName != null){
         keyBuilder.setEncodedRegionName(
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
index 0729203..cc798cc 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
@@ -185,7 +185,6 @@ public class RegionReplicaFlushHandler extends EventHandler 
{
             "Was not able to trigger a flush from primary region due to old 
server version? " +
               "Continuing to open the secondary region replica: " +
               region.getRegionInfo().getRegionNameAsString());
-          region.setReadsEnabled(true);
           break;
         }
       }
@@ -195,6 +194,6 @@ public class RegionReplicaFlushHandler extends EventHandler 
{
         throw new InterruptedIOException(e.getMessage());
       }
     }
+    region.setReadsEnabled(true);
   }
-
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
index f4c37b1..ca73663 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
@@ -29,6 +29,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
@@ -53,6 +54,7 @@ public interface ReplicationEndpoint extends 
ReplicationPeerConfigListener {
 
   @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
   class Context {
+    private final Server server;
     private final Configuration localConf;
     private final Configuration conf;
     private final FileSystem fs;
@@ -64,16 +66,11 @@ public interface ReplicationEndpoint extends 
ReplicationPeerConfigListener {
     private final Abortable abortable;
 
     @InterfaceAudience.Private
-    public Context(
-        final Configuration localConf,
-        final Configuration conf,
-        final FileSystem fs,
-        final String peerId,
-        final UUID clusterId,
-        final ReplicationPeer replicationPeer,
-        final MetricsSource metrics,
-        final TableDescriptors tableDescriptors,
-        final Abortable abortable) {
+    public Context(final Server server, final Configuration localConf, final 
Configuration conf,
+        final FileSystem fs, final String peerId, final UUID clusterId,
+        final ReplicationPeer replicationPeer, final MetricsSource metrics,
+        final TableDescriptors tableDescriptors, final Abortable abortable) {
+      this.server = server;
       this.localConf = localConf;
       this.conf = conf;
       this.fs = fs;
@@ -84,34 +81,50 @@ public interface ReplicationEndpoint extends 
ReplicationPeerConfigListener {
       this.tableDescriptors = tableDescriptors;
       this.abortable = abortable;
     }
+
+    public Server getServer() {
+      return server;
+    }
+
     public Configuration getConfiguration() {
       return conf;
     }
+
     public Configuration getLocalConfiguration() {
       return localConf;
     }
+
     public FileSystem getFilesystem() {
       return fs;
     }
+
     public UUID getClusterId() {
       return clusterId;
     }
+
     public String getPeerId() {
       return peerId;
     }
+
     public ReplicationPeerConfig getPeerConfig() {
       return replicationPeer.getPeerConfig();
     }
+
     public ReplicationPeer getReplicationPeer() {
       return replicationPeer;
     }
+
     public MetricsSource getMetrics() {
       return metrics;
     }
+
     public TableDescriptors getTableDescriptors() {
       return tableDescriptors;
     }
-    public Abortable getAbortable() { return abortable; }
+
+    public Abortable getAbortable() {
+      return abortable;
+    }
   }
 
   /**
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index f7721e0..65cf9a8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -19,67 +19,47 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Callable;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.RegionAdminServiceCallable;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
-import org.apache.hadoop.hbase.client.RetryingCallable;
-import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.util.AtomicUtils;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.RetryCounterFactory;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
-import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink;
-import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
-import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
-import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
 import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
-
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
+import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
 
 /**
- * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint
- * which receives the WAL edits from the WAL, and sends the edits to replicas
- * of regions.
+ * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint 
which receives the WAL
+ * edits from the WAL, and sends the edits to replicas of regions.
  */
 @InterfaceAudience.Private
 public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint 
{
@@ -87,32 +67,55 @@ public class RegionReplicaReplicationEndpoint extends 
HBaseReplicationEndpoint {
   private static final Logger LOG = 
LoggerFactory.getLogger(RegionReplicaReplicationEndpoint.class);
 
   // Can be configured differently than hbase.client.retries.number
-  private static String CLIENT_RETRIES_NUMBER
-    = "hbase.region.replica.replication.client.retries.number";
+  private static String CLIENT_RETRIES_NUMBER =
+    "hbase.region.replica.replication.client.retries.number";
 
   private Configuration conf;
-  private ClusterConnection connection;
+  private AsyncClusterConnection connection;
   private TableDescriptors tableDescriptors;
 
-  // Reuse WALSplitter constructs as a WAL pipe
-  private PipelineController controller;
-  private RegionReplicaOutputSink outputSink;
-  private EntryBuffers entryBuffers;
+  private int numRetries;
+
+  private long operationTimeoutNs;
 
-  // Number of writer threads
-  private int numWriterThreads;
+  private LoadingCache<TableName, Optional<TableDescriptor>> 
tableDescriptorCache;
 
-  private int operationTimeout;
+  private Cache<TableName, TableName> disabledTableCache;
 
-  private ExecutorService pool;
+  private final RetryCounterFactory retryCounterFactory =
+    new RetryCounterFactory(Integer.MAX_VALUE, 1000, 60000);
 
   @Override
   public void init(Context context) throws IOException {
     super.init(context);
-
-    this.conf = HBaseConfiguration.create(context.getConfiguration());
+    this.conf = context.getConfiguration();
     this.tableDescriptors = context.getTableDescriptors();
-
+    int memstoreReplicationEnabledCacheExpiryMs = conf
+      
.getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs",
 5000);
+    // A cache for the table "memstore replication enabled" flag.
+    // It has a default expiry of 5 sec. This means that if the table is 
altered
+    // with a different flag value, we might miss to replicate for that amount 
of
+    // time. But this cache avoid the slow lookup and parsing of the 
TableDescriptor.
+    tableDescriptorCache = CacheBuilder.newBuilder()
+      .expireAfterWrite(memstoreReplicationEnabledCacheExpiryMs, 
TimeUnit.MILLISECONDS)
+      .initialCapacity(10).maximumSize(1000)
+      .build(new CacheLoader<TableName, Optional<TableDescriptor>>() {
+
+        @Override
+        public Optional<TableDescriptor> load(TableName tableName) throws 
Exception {
+          // check if the table requires memstore replication
+          // some unit-test drop the table, so we should do a bypass check and 
always replicate.
+          return Optional.ofNullable(tableDescriptors.get(tableName));
+        }
+      });
+    int nonExistentTableCacheExpiryMs =
+      
conf.getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs",
 5000);
+    // A cache for non existing tables that have a default expiry of 5 sec. 
This means that if the
+    // table is created again with the same name, we might miss to replicate 
for that amount of
+    // time. But this cache prevents overloading meta requests for every edit 
from a deleted file.
+    disabledTableCache = CacheBuilder.newBuilder()
+      .expireAfterWrite(nonExistentTableCacheExpiryMs, 
TimeUnit.MILLISECONDS).initialCapacity(10)
+      .maximumSize(1000).build();
     // HRS multiplies client retries by 10 globally for meta operations, but 
we do not want this.
     // We are resetting it here because we want default number of retries (35) 
rather than 10 times
     // that which makes very long retries for disabled tables etc.
@@ -123,516 +126,261 @@ public class RegionReplicaReplicationEndpoint extends 
HBaseReplicationEndpoint {
         HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER);
       defaultNumRetries = defaultNumRetries / mult; // reset if HRS has 
multiplied this already
     }
-
-    conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
-    int numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries);
-    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, numRetries);
-
-    this.numWriterThreads = this.conf.getInt(
-      "hbase.region.replica.replication.writer.threads", 3);
-    controller = new PipelineController();
-    entryBuffers = new EntryBuffers(controller,
-        this.conf.getLong("hbase.region.replica.replication.buffersize", 128 * 
1024 * 1024));
-
+    this.numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries);
     // use the regular RPC timeout for replica replication RPC's
-    this.operationTimeout = 
conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
-      HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
-  }
-
-  @Override
-  protected void doStart() {
-    try {
-      connection = (ClusterConnection) 
ConnectionFactory.createConnection(this.conf);
-      this.pool = getDefaultThreadPool(conf);
-      outputSink = new RegionReplicaOutputSink(controller, tableDescriptors, 
entryBuffers,
-        connection, pool, numWriterThreads, operationTimeout);
-      outputSink.startWriterThreads();
-      super.doStart();
-    } catch (IOException ex) {
-      LOG.warn("Received exception while creating connection :" + ex);
-      notifyFailed(ex);
-    }
-  }
-
-  @Override
-  protected void doStop() {
-    if (outputSink != null) {
-      try {
-        outputSink.finishWritingAndClose();
-      } catch (IOException ex) {
-        LOG.warn("Got exception while trying to close OutputSink", ex);
-      }
-    }
-    if (this.pool != null) {
-      this.pool.shutdownNow();
-      try {
-        // wait for 10 sec
-        boolean shutdown = this.pool.awaitTermination(10000, 
TimeUnit.MILLISECONDS);
-        if (!shutdown) {
-          LOG.warn("Failed to shutdown the thread pool after 10 seconds");
-        }
-      } catch (InterruptedException e) {
-        LOG.warn("Got interrupted while waiting for the thread pool to shut 
down" + e);
-      }
-    }
-    if (connection != null) {
-      try {
-        connection.close();
-      } catch (IOException ex) {
-        LOG.warn("Got exception closing connection :" + ex);
-      }
-    }
-    super.doStop();
+    this.operationTimeoutNs =
+      
TimeUnit.MILLISECONDS.toNanos(conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
+    this.connection = context.getServer().getAsyncClusterConnection();
   }
 
   /**
-   * Returns a Thread pool for the RPC's to region replicas. Similar to
-   * Connection's thread pool.
+   * returns true if the specified entry must be replicated. We should always 
replicate meta
+   * operations (e.g. flush) and use the user HTD flag to decide whether or 
not replicate the
+   * memstore.
    */
-  private ExecutorService getDefaultThreadPool(Configuration conf) {
-    int maxThreads = 
conf.getInt("hbase.region.replica.replication.threads.max", 256);
-    if (maxThreads == 0) {
-      maxThreads = Runtime.getRuntime().availableProcessors() * 8;
+  private boolean requiresReplication(Optional<TableDescriptor> tableDesc, 
Entry entry) {
+    // empty edit does not need to be replicated
+    if (entry.getEdit().isEmpty() || !tableDesc.isPresent()) {
+      return false;
     }
-    long keepAliveTime = 
conf.getLong("hbase.region.replica.replication.threads.keepalivetime", 60);
-    LinkedBlockingQueue<Runnable> workQueue =
-        new LinkedBlockingQueue<>(maxThreads *
-            conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
-              HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
-    ThreadPoolExecutor tpe = new ThreadPoolExecutor(
-      maxThreads,
-      maxThreads,
-      keepAliveTime,
-      TimeUnit.SECONDS,
-      workQueue,
-      Threads.newDaemonThreadFactory(this.getClass().getSimpleName() + 
"-rpc-shared-"));
-    tpe.allowCoreThreadTimeOut(true);
-    return tpe;
+    // meta edits (e.g. flush) must be always replicated
+    return entry.getEdit().isMetaEdit() || 
tableDesc.get().hasRegionMemStoreReplication();
   }
 
-  @Override
-  public boolean replicate(ReplicateContext replicateContext) {
-    /* A note on batching in RegionReplicaReplicationEndpoint (RRRE):
-     *
-     * RRRE relies on batching from two different mechanisms. The first is the 
batching from
-     * ReplicationSource since RRRE is a ReplicationEndpoint driven by RS. RS 
reads from a single
-     * WAL file filling up a buffer of heap size 
"replication.source.size.capacity"(64MB) or at most
-     * "replication.source.nb.capacity" entries or until it sees the end of 
file (in live tailing).
-     * Then RS passes all the buffered edits in this replicate() call context. 
RRRE puts the edits
-     * to the WALSplitter.EntryBuffers which is a blocking buffer space of up 
to
-     * "hbase.region.replica.replication.buffersize" (128MB) in size. This 
buffer splits the edits
-     * based on regions.
-     *
-     * There are "hbase.region.replica.replication.writer.threads"(default 3) 
writer threads which
-     * pick largest per-region buffer and send it to the SinkWriter (see 
RegionReplicaOutputSink).
-     * The SinkWriter in this case will send the wal edits to all secondary 
region replicas in
-     * parallel via a retrying rpc call. EntryBuffers guarantees that while a 
buffer is
-     * being written to the sink, another buffer for the same region will not 
be made available to
-     * writers ensuring regions edits are not replayed out of order.
-     *
-     * The replicate() call won't return until all the buffers are sent and 
ack'd by the sinks so
-     * that the replication can assume all edits are persisted. We may be able 
to do a better
-     * pipelining between the replication thread and output sinks later if it 
becomes a bottleneck.
-     */
-
-    while (this.isRunning()) {
-      try {
-        for (Entry entry: replicateContext.getEntries()) {
-          entryBuffers.appendEntry(entry);
+  private void getRegionLocations(CompletableFuture<RegionLocations> future,
+      TableDescriptor tableDesc, byte[] encodedRegionName, byte[] row, boolean 
reload) {
+    
FutureUtils.addListener(connection.getRegionLocations(tableDesc.getTableName(), 
row, reload),
+      (r, e) -> {
+        if (e != null) {
+          future.completeExceptionally(e);
+          return;
         }
-        outputSink.flush(); // make sure everything is flushed
-        ctx.getMetrics().incrLogEditsFiltered(
-          outputSink.getSkippedEditsCounter().getAndSet(0));
-        return true;
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        return false;
-      } catch (IOException e) {
-        LOG.warn("Received IOException while trying to replicate"
-            + StringUtils.stringifyException(e));
-      }
-    }
-
-    return false;
-  }
-
-  @Override
-  public boolean canReplicateToSameCluster() {
-    return true;
-  }
-
-  @Override
-  protected WALEntryFilter getScopeWALEntryFilter() {
-    // we do not care about scope. We replicate everything.
-    return null;
+        // if we are not loading from cache, just return
+        if (reload) {
+          future.complete(r);
+          return;
+        }
+        // check if the number of region replicas is correct, and also the 
primary region name
+        // matches
+        if (r.size() == tableDesc.getRegionReplication() && Bytes.equals(
+          r.getDefaultRegionLocation().getRegion().getEncodedNameAsBytes(), 
encodedRegionName)) {
+          future.complete(r);
+        } else {
+          // reload again as the information in cache maybe stale
+          getRegionLocations(future, tableDesc, encodedRegionName, row, true);
+        }
+      });
   }
 
-  static class RegionReplicaOutputSink extends OutputSink {
-    private final RegionReplicaSinkWriter sinkWriter;
-    private final TableDescriptors tableDescriptors;
-    private final Cache<TableName, Boolean> memstoreReplicationEnabled;
-
-    public RegionReplicaOutputSink(PipelineController controller, 
TableDescriptors tableDescriptors,
-        EntryBuffers entryBuffers, ClusterConnection connection, 
ExecutorService pool,
-        int numWriters, int operationTimeout) {
-      super(controller, entryBuffers, numWriters);
-      this.sinkWriter =
-          new RegionReplicaSinkWriter(this, connection, pool, 
operationTimeout, tableDescriptors);
-      this.tableDescriptors = tableDescriptors;
-
-      // A cache for the table "memstore replication enabled" flag.
-      // It has a default expiry of 5 sec. This means that if the table is 
altered
-      // with a different flag value, we might miss to replicate for that 
amount of
-      // time. But this cache avoid the slow lookup and parsing of the 
TableDescriptor.
-      int memstoreReplicationEnabledCacheExpiryMs = 
connection.getConfiguration()
-        
.getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs",
 5000);
-      this.memstoreReplicationEnabled = CacheBuilder.newBuilder()
-        .expireAfterWrite(memstoreReplicationEnabledCacheExpiryMs, 
TimeUnit.MILLISECONDS)
-        .initialCapacity(10)
-        .maximumSize(1000)
-        .build();
+  private void replicate(CompletableFuture<Long> future, RegionLocations locs,
+      TableDescriptor tableDesc, byte[] encodedRegionName, byte[] row, 
List<Entry> entries) {
+    if (locs.size() == 1) {
+      // Could this happen?
+      future.complete(Long.valueOf(entries.size()));
+      return;
     }
-
-    @Override
-    public void append(RegionEntryBuffer buffer) throws IOException {
-      List<Entry> entries = buffer.getEntryBuffer();
-
-      if (entries.isEmpty() || entries.get(0).getEdit().getCells().isEmpty()) {
-        return;
-      }
-
-      // meta edits (e.g. flush) are always replicated.
-      // data edits (e.g. put) are replicated if the table requires them.
-      if (!requiresReplication(buffer.getTableName(), entries)) {
-        return;
+    if 
(!Bytes.equals(locs.getDefaultRegionLocation().getRegion().getEncodedNameAsBytes(),
+      encodedRegionName)) {
+      // the region name is not equal, this usually means the region has been 
split or merged, so
+      // give up replicating as the new region(s) should already have all the 
data of the parent
+      // region(s).
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(
+          "Skipping {} entries in table {} because located region {} is 
different than" +
+            " the original region {} from WALEdit",
+          tableDesc.getTableName(), 
locs.getDefaultRegionLocation().getRegion().getEncodedName(),
+          Bytes.toStringBinary(encodedRegionName));
       }
-
-      sinkWriter.append(buffer.getTableName(), buffer.getEncodedRegionName(),
-        CellUtil.cloneRow(entries.get(0).getEdit().getCells().get(0)), 
entries);
-    }
-
-    @Override
-    public boolean flush() throws IOException {
-      // nothing much to do for now. Wait for the Writer threads to finish up
-      // append()'ing the data.
-      entryBuffers.waitUntilDrained();
-      return super.flush();
-    }
-
-    @Override
-    public boolean keepRegionEvent(Entry entry) {
-      return true;
-    }
-
-    @Override
-    public List<Path> finishWritingAndClose() throws IOException {
-      finishWriting(true);
-      return null;
-    }
-
-    @Override
-    public Map<byte[], Long> getOutputCounts() {
-      return null; // only used in tests
-    }
-
-    @Override
-    public int getNumberOfRecoveredRegions() {
-      return 0;
-    }
-
-    AtomicLong getSkippedEditsCounter() {
-      return skippedEdits;
+      future.complete(Long.valueOf(entries.size()));
+      return;
     }
-
-    /**
-     * returns true if the specified entry must be replicated.
-     * We should always replicate meta operations (e.g. flush)
-     * and use the user HTD flag to decide whether or not replicate the 
memstore.
-     */
-    private boolean requiresReplication(final TableName tableName, final 
List<Entry> entries)
-        throws IOException {
-      // unit-tests may not the TableDescriptors, bypass the check and always 
replicate
-      if (tableDescriptors == null) return true;
-
-      Boolean requiresReplication = 
memstoreReplicationEnabled.getIfPresent(tableName);
-      if (requiresReplication == null) {
-        // check if the table requires memstore replication
-        // some unit-test drop the table, so we should do a bypass check and 
always replicate.
-        TableDescriptor htd = tableDescriptors.get(tableName);
-        requiresReplication = htd == null || 
htd.hasRegionMemStoreReplication();
-        memstoreReplicationEnabled.put(tableName, requiresReplication);
-      }
-
-      // if memstore replication is not required, check the entries.
-      // meta edits (e.g. flush) must be always replicated.
-      if (!requiresReplication) {
-        int skipEdits = 0;
-        java.util.Iterator<Entry> it = entries.iterator();
-        while (it.hasNext()) {
-          Entry entry = it.next();
-          if (entry.getEdit().isMetaEdit()) {
-            requiresReplication = true;
+    AtomicReference<Throwable> error = new AtomicReference<>();
+    AtomicInteger remainingTasks = new AtomicInteger(locs.size() - 1);
+    AtomicLong skippedEdits = new AtomicLong(0);
+
+    for (int i = 1, n = locs.size(); i < n; i++) {
+      final int replicaId = i;
+      FutureUtils.addListener(connection.replay(tableDesc.getTableName(),
+        locs.getRegionLocation(replicaId).getRegion().getEncodedNameAsBytes(), 
row, entries,
+        replicaId, numRetries, operationTimeoutNs), (r, e) -> {
+          if (e != null) {
+            LOG.warn("Failed to replicate to {}", 
locs.getRegionLocation(replicaId), e);
+            error.compareAndSet(null, e);
           } else {
-            it.remove();
-            skipEdits++;
+            AtomicUtils.updateMax(skippedEdits, r.longValue());
           }
-        }
-        skippedEdits.addAndGet(skipEdits);
-      }
-      return requiresReplication;
+          if (remainingTasks.decrementAndGet() == 0) {
+            if (error.get() != null) {
+              future.completeExceptionally(error.get());
+            } else {
+              future.complete(skippedEdits.get());
+            }
+          }
+        });
     }
   }
 
-  static class RegionReplicaSinkWriter extends SinkWriter {
-    RegionReplicaOutputSink sink;
-    ClusterConnection connection;
-    RpcControllerFactory rpcControllerFactory;
-    RpcRetryingCallerFactory rpcRetryingCallerFactory;
-    int operationTimeout;
-    ExecutorService pool;
-    Cache<TableName, Boolean> disabledAndDroppedTables;
-    TableDescriptors tableDescriptors;
-
-    public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, 
ClusterConnection connection,
-        ExecutorService pool, int operationTimeout, TableDescriptors 
tableDescriptors) {
-      this.sink = sink;
-      this.connection = connection;
-      this.operationTimeout = operationTimeout;
-      this.rpcRetryingCallerFactory
-        = RpcRetryingCallerFactory.instantiate(connection.getConfiguration());
-      this.rpcControllerFactory = 
RpcControllerFactory.instantiate(connection.getConfiguration());
-      this.pool = pool;
-      this.tableDescriptors = tableDescriptors;
-
-      int nonExistentTableCacheExpiryMs = connection.getConfiguration()
-        
.getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs",
 5000);
-      // A cache for non existing tables that have a default expiry of 5 sec. 
This means that if the
-      // table is created again with the same name, we might miss to replicate 
for that amount of
-      // time. But this cache prevents overloading meta requests for every 
edit from a deleted file.
-      disabledAndDroppedTables = CacheBuilder.newBuilder()
-        .expireAfterWrite(nonExistentTableCacheExpiryMs, TimeUnit.MILLISECONDS)
-        .initialCapacity(10)
-        .maximumSize(1000)
-        .build();
+  private void logSkipped(TableName tableName, List<Entry> entries, String 
reason) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Skipping {} entries because table {} is {}", entries.size(), 
tableName, reason);
+      for (Entry entry : entries) {
+        LOG.trace("Skipping : {}", entry);
+      }
     }
+  }
 
-    public void append(TableName tableName, byte[] encodedRegionName, byte[] 
row,
-        List<Entry> entries) throws IOException {
-
-      if (disabledAndDroppedTables.getIfPresent(tableName) != null) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Skipping " + entries.size() + " entries because table " + 
tableName
-            + " is cached as a disabled or dropped table");
-          for (Entry entry : entries) {
-            LOG.trace("Skipping : " + entry);
-          }
-        }
-        sink.getSkippedEditsCounter().addAndGet(entries.size());
-        return;
+  private CompletableFuture<Long> replicate(TableDescriptor tableDesc, byte[] 
encodedRegionName,
+      List<Entry> entries) {
+    if (disabledTableCache.getIfPresent(tableDesc.getTableName()) != null) {
+      logSkipped(tableDesc.getTableName(), entries, "cached as a disabled 
table");
+      return CompletableFuture.completedFuture(Long.valueOf(entries.size()));
+    }
+    byte[] row = CellUtil.cloneRow(entries.get(0).getEdit().getCells().get(0));
+    CompletableFuture<RegionLocations> locateFuture = new 
CompletableFuture<>();
+    getRegionLocations(locateFuture, tableDesc, encodedRegionName, row, false);
+    CompletableFuture<Long> future = new CompletableFuture<>();
+    FutureUtils.addListener(locateFuture, (locs, error) -> {
+      if (error != null) {
+        future.completeExceptionally(error);
+      } else {
+        replicate(future, locs, tableDesc, encodedRegionName, row, entries);
       }
+    });
+    return future;
+  }
 
-      // If the table is disabled or dropped, we should not replay the 
entries, and we can skip
-      // replaying them. However, we might not know whether the table is 
disabled until we
-      // invalidate the cache and check from meta
-      RegionLocations locations = null;
-      boolean useCache = true;
-      while (true) {
-        // get the replicas of the primary region
+  @Override
+  public boolean replicate(ReplicateContext replicateContext) {
+    Map<byte[], Pair<TableDescriptor, List<Entry>>> encodedRegionName2Entries =
+      new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    long skippedEdits = 0;
+    RetryCounter retryCounter = retryCounterFactory.create();
+    outer: while (isRunning()) {
+      encodedRegionName2Entries.clear();
+      skippedEdits = 0;
+      for (Entry entry : replicateContext.getEntries()) {
+        Optional<TableDescriptor> tableDesc;
         try {
-          locations = RegionReplicaReplayCallable
-              .getRegionLocations(connection, tableName, row, useCache, 0);
-
-          if (locations == null) {
-            throw new HBaseIOException("Cannot locate locations for "
-                + tableName + ", row:" + Bytes.toStringBinary(row));
+          tableDesc = tableDescriptorCache.get(entry.getKey().getTableName());
+        } catch (ExecutionException e) {
+          LOG.warn("Failed to load table descriptor for {}, attempts={}",
+            entry.getKey().getTableName(), retryCounter.getAttemptTimes(), 
e.getCause());
+          if (!retryCounter.shouldRetry()) {
+            return false;
           }
-        } catch (TableNotFoundException e) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Skipping " + entries.size() + " entries because table " 
+ tableName
-              + " is dropped. Adding table to cache.");
-            for (Entry entry : entries) {
-              LOG.trace("Skipping : " + entry);
-            }
+          try {
+            retryCounter.sleepUntilNextRetry();
+          } catch (InterruptedException e1) {
+            // restore the interrupted state
+            Thread.currentThread().interrupt();
+            return false;
           }
-          disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to 
cache. Value ignored
-          // skip this entry
-          sink.getSkippedEditsCounter().addAndGet(entries.size());
-          return;
+          continue outer;
         }
-
-        // check whether we should still replay this entry. If the regions are 
changed, or the
-        // entry is not coming from the primary region, filter it out.
-        HRegionLocation primaryLocation = locations.getDefaultRegionLocation();
-        if 
(!Bytes.equals(primaryLocation.getRegionInfo().getEncodedNameAsBytes(),
-          encodedRegionName)) {
-          if (useCache) {
-            useCache = false;
-            continue; // this will retry location lookup
-          }
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Skipping " + entries.size() + " entries in table " + 
tableName
-              + " because located region " + 
primaryLocation.getRegionInfo().getEncodedName()
-              + " is different than the original region " + 
Bytes.toStringBinary(encodedRegionName)
-              + " from WALEdit");
-            for (Entry entry : entries) {
-              LOG.trace("Skipping : " + entry);
-            }
-          }
-          sink.getSkippedEditsCounter().addAndGet(entries.size());
-          return;
+        if (!requiresReplication(tableDesc, entry)) {
+          skippedEdits++;
+          continue;
         }
-        break;
+        byte[] encodedRegionName = entry.getKey().getEncodedRegionName();
+        encodedRegionName2Entries
+          .computeIfAbsent(encodedRegionName, k -> 
Pair.newPair(tableDesc.get(), new ArrayList<>()))
+          .getSecond().add(entry);
       }
-
-      if (locations.size() == 1) {
-        return;
-      }
-
-      ArrayList<Future<ReplicateWALEntryResponse>> tasks = new 
ArrayList<>(locations.size() - 1);
-
-      // All passed entries should belong to one region because it is coming 
from the EntryBuffers
-      // split per region. But the regions might split and merge (unlike log 
recovery case).
-      for (int replicaId = 0; replicaId < locations.size(); replicaId++) {
-        HRegionLocation location = locations.getRegionLocation(replicaId);
-        if (!RegionReplicaUtil.isDefaultReplica(replicaId)) {
-          RegionInfo regionInfo = location == null
-              ? RegionReplicaUtil.getRegionInfoForReplica(
-                locations.getDefaultRegionLocation().getRegionInfo(), 
replicaId)
-              : location.getRegionInfo();
-          RegionReplicaReplayCallable callable = new 
RegionReplicaReplayCallable(connection,
-            rpcControllerFactory, tableName, location, regionInfo, row, 
entries,
-            sink.getSkippedEditsCounter());
-           Future<ReplicateWALEntryResponse> task = pool.submit(
-             new RetryingRpcCallable<>(rpcRetryingCallerFactory, callable, 
operationTimeout));
-           tasks.add(task);
-        }
+      break;
+    }
+    // send the request to regions
+    retryCounter = retryCounterFactory.create();
+    while (isRunning()) {
+      List<Pair<CompletableFuture<Long>, byte[]>> 
futureAndEncodedRegionNameList =
+        new ArrayList<Pair<CompletableFuture<Long>, byte[]>>();
+      for (Map.Entry<byte[], Pair<TableDescriptor, List<Entry>>> entry : 
encodedRegionName2Entries
+        .entrySet()) {
+        CompletableFuture<Long> future =
+          replicate(entry.getValue().getFirst(), entry.getKey(), 
entry.getValue().getSecond());
+        futureAndEncodedRegionNameList.add(Pair.newPair(future, 
entry.getKey()));
       }
-
-      boolean tasksCancelled = false;
-      for (int replicaId = 0; replicaId < tasks.size(); replicaId++) {
+      for (Pair<CompletableFuture<Long>, byte[]> pair : 
futureAndEncodedRegionNameList) {
+        byte[] encodedRegionName = pair.getSecond();
         try {
-          tasks.get(replicaId).get();
+          skippedEdits += pair.getFirst().get().longValue();
+          encodedRegionName2Entries.remove(encodedRegionName);
         } catch (InterruptedException e) {
-          throw new InterruptedIOException(e.getMessage());
+          // restore the interrupted state
+          Thread.currentThread().interrupt();
+          return false;
         } catch (ExecutionException e) {
+          Pair<TableDescriptor, List<Entry>> tableAndEntries =
+            encodedRegionName2Entries.get(encodedRegionName);
+          TableName tableName = tableAndEntries.getFirst().getTableName();
+          List<Entry> entries = tableAndEntries.getSecond();
           Throwable cause = e.getCause();
-          boolean canBeSkipped = false;
-          if (cause instanceof IOException) {
-            // The table can be disabled or dropped at this time. For disabled 
tables, we have no
-            // cheap mechanism to detect this case because meta does not 
contain this information.
-            // ClusterConnection.isTableDisabled() is a zk call which we 
cannot do for every replay
-            // RPC. So instead we start the replay RPC with retries and check 
whether the table is
-            // dropped or disabled which might cause SocketTimeoutException, or
-            // RetriesExhaustedException or similar if we get IOE.
-            if (cause instanceof TableNotFoundException
-                || connection.isTableDisabled(tableName)) {
-              disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to 
cache for later.
-              canBeSkipped = true;
-            } else if (tableDescriptors != null) {
-              TableDescriptor tableDescriptor = 
tableDescriptors.get(tableName);
-              if (tableDescriptor != null
-                  //(replicaId + 1) as no task is added for primary replica 
for replication
-                  && tableDescriptor.getRegionReplication() <= (replicaId + 
1)) {
-                canBeSkipped = true;
-              }
-            }
-            if (canBeSkipped) {
-              if (LOG.isTraceEnabled()) {
-                LOG.trace("Skipping " + entries.size() + " entries in table " 
+ tableName
-                    + " because received exception for dropped or disabled 
table",
-                  cause);
-                for (Entry entry : entries) {
-                  LOG.trace("Skipping : " + entry);
-                }
-              }
-              if (!tasksCancelled) {
-                sink.getSkippedEditsCounter().addAndGet(entries.size());
-                tasksCancelled = true; // so that we do not add to skipped 
counter again
-              }
-              continue;
-            }
-
-            // otherwise rethrow
-            throw (IOException)cause;
+          // The table can be disabled or dropped at this time. For disabled 
tables, we have no
+          // cheap mechanism to detect this case because meta does not contain 
this information.
+          // ClusterConnection.isTableDisabled() is a zk call which we cannot 
do for every replay
+          // RPC. So instead we start the replay RPC with retries and check 
whether the table is
+          // dropped or disabled which might cause SocketTimeoutException, or
+          // RetriesExhaustedException or similar if we get IOE.
+          if (cause instanceof TableNotFoundException) {
+            // add to cache that the table does not exist
+            tableDescriptorCache.put(tableName, Optional.empty());
+            logSkipped(tableName, entries, "dropped");
+            skippedEdits += entries.size();
+            encodedRegionName2Entries.remove(encodedRegionName);
+            continue;
+          }
+          boolean disabled = false;
+          try {
+            disabled = connection.getAdmin().isTableDisabled(tableName).get();
+          } catch (InterruptedException e1) {
+            // restore the interrupted state
+            Thread.currentThread().interrupt();
+            return false;
+          } catch (ExecutionException e1) {
+            LOG.warn("Failed to test whether {} is disabled, assume it is not 
disabled", tableName,
+              e1.getCause());
+          }
+          if (disabled) {
+            disabledTableCache.put(tableName, tableName);
+            logSkipped(tableName, entries, "disabled");
+            skippedEdits += entries.size();
+            encodedRegionName2Entries.remove(encodedRegionName);
+            continue;
           }
-          // unexpected exception
-          throw new IOException(cause);
+          LOG.warn("Failed to replicate {} entries for region {} of table {}", 
entries.size(),
+            Bytes.toStringBinary(encodedRegionName), tableName);
+        }
+      }
+      // we have done
+      if (encodedRegionName2Entries.isEmpty()) {
+        ctx.getMetrics().incrLogEditsFiltered(skippedEdits);
+        return true;
+      } else {
+        LOG.warn("Failed to replicate all entris, retry={}", 
retryCounter.getAttemptTimes());
+        if (!retryCounter.shouldRetry()) {
+          return false;
+        }
+        try {
+          retryCounter.sleepUntilNextRetry();
+        } catch (InterruptedException e) {
+          // restore the interrupted state
+          Thread.currentThread().interrupt();
+          return false;
         }
       }
     }
-  }
 
-  static class RetryingRpcCallable<V> implements Callable<V> {
-    RpcRetryingCallerFactory factory;
-    RetryingCallable<V> callable;
-    int timeout;
-    public RetryingRpcCallable(RpcRetryingCallerFactory factory, 
RetryingCallable<V> callable,
-        int timeout) {
-      this.factory = factory;
-      this.callable = callable;
-      this.timeout = timeout;
-    }
-    @Override
-    public V call() throws Exception {
-      return factory.<V>newCaller().callWithRetries(callable, timeout);
-    }
+    return false;
   }
 
-  /**
-   * Calls replay on the passed edits for the given set of entries belonging 
to the region. It skips
-   * the entry if the region boundaries have changed or the region is gone.
-   */
-  static class RegionReplicaReplayCallable extends
-      RegionAdminServiceCallable<ReplicateWALEntryResponse> {
-    private final List<Entry> entries;
-    private final byte[] initialEncodedRegionName;
-    private final AtomicLong skippedEntries;
-
-    public RegionReplicaReplayCallable(ClusterConnection connection,
-        RpcControllerFactory rpcControllerFactory, TableName tableName,
-        HRegionLocation location, RegionInfo regionInfo, byte[] 
row,List<Entry> entries,
-        AtomicLong skippedEntries) {
-      super(connection, rpcControllerFactory, location, tableName, row, 
regionInfo.getReplicaId());
-      this.entries = entries;
-      this.skippedEntries = skippedEntries;
-      this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes();
-    }
-
-    @Override
-    public ReplicateWALEntryResponse call(HBaseRpcController controller) 
throws Exception {
-      // Check whether we should still replay this entry. If the regions are 
changed, or the
-      // entry is not coming form the primary region, filter it out because we 
do not need it.
-      // Regions can change because of (1) region split (2) region merge (3) 
table recreated
-      boolean skip = false;
-      if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(),
-          initialEncodedRegionName)) {
-        skip = true;
-      }
-      if (!this.entries.isEmpty() && !skip) {
-        Entry[] entriesArray = new Entry[this.entries.size()];
-        entriesArray = this.entries.toArray(entriesArray);
-
-        // set the region name for the target region replica
-        Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
-            ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray, 
location
-                .getRegionInfo().getEncodedNameAsBytes(), null, null, null);
-        controller.setCellScanner(p.getSecond());
-        return stub.replay(controller, p.getFirst());
-      }
+  @Override
+  public boolean canReplicateToSameCluster() {
+    return true;
+  }
 
-      if (skip) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Skipping " + entries.size() + " entries in table " + 
tableName
-            + " because located region " + 
location.getRegionInfo().getEncodedName()
-            + " is different than the original region "
-            + Bytes.toStringBinary(initialEncodedRegionName) + " from 
WALEdit");
-          for (Entry entry : entries) {
-            LOG.trace("Skipping : " + entry);
-          }
-        }
-        skippedEntries.addAndGet(entries.size());
-      }
-      return ReplicateWALEntryResponse.newBuilder().build();
-    }
+  @Override
+  protected WALEntryFilter getScopeWALEntryFilter() {
+    // we do not care about scope. We replicate everything.
+    return null;
   }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index f1b6e76..c453e21 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -282,7 +282,7 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
       tableDescriptors = ((HRegionServer) server).getTableDescriptors();
     }
     replicationEndpoint
-      .init(new ReplicationEndpoint.Context(conf, 
replicationPeer.getConfiguration(), fs,
+      .init(new ReplicationEndpoint.Context(server, conf, 
replicationPeer.getConfiguration(), fs,
         replicationPeer.getId(), clusterId, replicationPeer, metrics, 
tableDescriptors, server));
     replicationEndpoint.start();
     replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
index e1e55f5..9af60c5 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
@@ -88,7 +88,7 @@ public class TestAsyncTableNoncedRetry {
       registry.getClusterId().get(), null, User.getCurrent()) {
 
       @Override
-      public NonceGenerator getNonceGenerator() {
+      NonceGenerator getNonceGenerator() {
         return NONCE_GENERATOR;
       }
     };
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
index 04db81a..017d7c9 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
@@ -20,16 +20,17 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.UUID;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.Cell.Type;
 import org.apache.hadoop.hbase.CellBuilderFactory;
@@ -42,7 +43,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.RegionLocator;
@@ -51,12 +51,12 @@ import 
org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.testclassification.FlakeyTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
@@ -383,9 +383,8 @@ public class TestRegionReplicaReplicationEndpoint {
     testRegionReplicaReplicationIgnores(false, true);
   }
 
-  public void testRegionReplicaReplicationIgnores(boolean dropTable, boolean 
disableReplication)
+  private void testRegionReplicaReplicationIgnores(boolean dropTable, boolean 
disableReplication)
       throws Exception {
-
     // tests having edits from a disabled or dropped table is handled 
correctly by skipping those
     // entries and further edits after the edits from dropped/disabled table 
can be replicated
     // without problems.
@@ -405,8 +404,7 @@ public class TestRegionReplicaReplicationEndpoint {
     HTU.getAdmin().createTable(htd);
 
     // both tables are created, now pause replication
-    ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
-    admin.disablePeer(ServerRegionReplicaUtil.getReplicationPeerId());
+    
HTU.getAdmin().disableReplicationPeer(ServerRegionReplicaUtil.getReplicationPeerId());
 
     // now that the replication is disabled, write to the table to be dropped, 
then drop the table.
 
@@ -416,19 +414,9 @@ public class TestRegionReplicaReplicationEndpoint {
 
     HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtility.fam1, 6000, 
7000);
 
-    AtomicLong skippedEdits = new AtomicLong();
-    RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink =
-        mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class);
-    when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits);
-    FSTableDescriptors fstd = new FSTableDescriptors(HTU.getConfiguration(),
-        FileSystem.get(HTU.getConfiguration()), HTU.getDefaultRootDirPath());
-    RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter =
-        new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink,
-            (ClusterConnection) connection, 
Executors.newSingleThreadExecutor(), Integer.MAX_VALUE,
-            fstd);
     RegionLocator rl = connection.getRegionLocator(toBeDisabledTable);
     HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY);
-    byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes();
+    byte[] encodedRegionName = hrl.getRegion().getEncodedNameAsBytes();
 
     Cell cell = 
CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes("A"))
         
.setFamily(HTU.fam1).setValue(Bytes.toBytes("VAL")).setType(Type.Put).build();
@@ -436,7 +424,6 @@ public class TestRegionReplicaReplicationEndpoint {
       new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1),
         new WALEdit()
             .add(cell));
-
     HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
     if (dropTable) {
       HTU.getAdmin().deleteTable(toBeDisabledTable);
@@ -445,11 +432,23 @@ public class TestRegionReplicaReplicationEndpoint {
       HTU.getAdmin().modifyTable(toBeDisabledTable, htd);
       HTU.getAdmin().enableTable(toBeDisabledTable);
     }
-    sinkWriter.append(toBeDisabledTable, encodedRegionName,
-      HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry));
-
-    assertEquals(2, skippedEdits.get());
 
+    HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(0);
+    MetricsSource metrics = mock(MetricsSource.class);
+    ReplicationEndpoint.Context ctx =
+      new ReplicationEndpoint.Context(rs, HTU.getConfiguration(), 
HTU.getConfiguration(),
+        HTU.getTestFileSystem(), 
ServerRegionReplicaUtil.getReplicationPeerId(),
+        UUID.fromString(rs.getClusterId()), 
rs.getReplicationSourceService().getReplicationPeers()
+          .getPeer(ServerRegionReplicaUtil.getReplicationPeerId()),
+        metrics, rs.getTableDescriptors(), rs);
+    RegionReplicaReplicationEndpoint rrpe = new 
RegionReplicaReplicationEndpoint();
+    rrpe.init(ctx);
+    rrpe.start();
+    ReplicationEndpoint.ReplicateContext repCtx = new 
ReplicationEndpoint.ReplicateContext();
+    repCtx.setEntries(Lists.newArrayList(entry, entry));
+    assertTrue(rrpe.replicate(repCtx));
+    verify(metrics, times(1)).incrLogEditsFiltered(eq(2L));
+    rrpe.stop();
     if (disableReplication) {
       // enable replication again so that we can verify replication
       HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
@@ -460,17 +459,14 @@ public class TestRegionReplicaReplicationEndpoint {
 
     try {
       // load some data to the to-be-dropped table
-
       // load the data to the table
       HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000);
 
       // now enable the replication
-      admin.enablePeer(ServerRegionReplicaUtil.getReplicationPeerId());
+      
HTU.getAdmin().enableReplicationPeer(ServerRegionReplicaUtil.getReplicationPeerId());
 
       verifyReplication(tableName, regionReplication, 0, 1000);
-
     } finally {
-      admin.close();
       table.close();
       rl.close();
       tableToBeDisabled.close();
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
index ab67d94..de0f151 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
@@ -19,15 +19,16 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import static 
org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion;
 import static 
org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion;
-import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -37,24 +38,22 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.WALCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.WALObserver;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import 
org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext;
-import 
org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint.RegionReplicaReplayCallable;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
@@ -73,8 +72,6 @@ import org.junit.experimental.categories.Category;
 
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
-
 /**
  * Tests RegionReplicaReplicationEndpoint. Unlike 
TestRegionReplicaReplicationEndpoint this
  * class contains lower level tests using callables.
@@ -178,39 +175,34 @@ public class TestRegionReplicaReplicationEndpointNoMaster 
{
   public void testReplayCallable() throws Exception {
     // tests replaying the edits to a secondary region replica using the 
Callable directly
     openRegion(HTU, rs0, hriSecondary);
-    ClusterConnection connection =
-        (ClusterConnection) 
ConnectionFactory.createConnection(HTU.getConfiguration());
 
-    //load some data to primary
+    // load some data to primary
     HTU.loadNumericRows(table, f, 0, 1000);
 
     Assert.assertEquals(1000, entries.size());
-    // replay the edits to the secondary using replay callable
-    replicateUsingCallable(connection, entries);
+    try (AsyncClusterConnection conn = ClusterConnectionFactory
+      .createAsyncClusterConnection(HTU.getConfiguration(), null, 
User.getCurrent())) {
+      // replay the edits to the secondary using replay callable
+      replicateUsingCallable(conn, entries);
+    }
 
     Region region = rs0.getRegion(hriSecondary.getEncodedName());
     HTU.verifyNumericRows(region, f, 0, 1000);
 
     HTU.deleteNumericRows(table, f, 0, 1000);
     closeRegion(HTU, rs0, hriSecondary);
-    connection.close();
   }
 
-  private void replicateUsingCallable(ClusterConnection connection, 
Queue<Entry> entries)
-      throws IOException, RuntimeException {
+  private void replicateUsingCallable(AsyncClusterConnection connection, 
Queue<Entry> entries)
+      throws IOException, ExecutionException, InterruptedException {
     Entry entry;
     while ((entry = entries.poll()) != null) {
       byte[] row = CellUtil.cloneRow(entry.getEdit().getCells().get(0));
-      RegionLocations locations = connection.locateRegion(tableName, row, 
true, true);
-      RegionReplicaReplayCallable callable = new 
RegionReplicaReplayCallable(connection,
-        RpcControllerFactory.instantiate(connection.getConfiguration()),
-        table.getName(), locations.getRegionLocation(1),
-        locations.getRegionLocation(1).getRegionInfo(), row, 
Lists.newArrayList(entry),
-        new AtomicLong());
-
-      RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(
-        connection.getConfiguration());
-      factory.<ReplicateWALEntryResponse> 
newCaller().callWithRetries(callable, 10000);
+      RegionLocations locations = connection.getRegionLocations(tableName, 
row, true).get();
+      connection
+        .replay(tableName, 
locations.getRegionLocation(1).getRegion().getEncodedNameAsBytes(), row,
+          Collections.singletonList(entry), 1, Integer.MAX_VALUE, 
TimeUnit.SECONDS.toNanos(10))
+        .get();
     }
   }
 
@@ -218,49 +210,49 @@ public class TestRegionReplicaReplicationEndpointNoMaster 
{
   public void testReplayCallableWithRegionMove() throws Exception {
     // tests replaying the edits to a secondary region replica using the 
Callable directly while
     // the region is moved to another location.It tests handling of RME.
-    openRegion(HTU, rs0, hriSecondary);
-    ClusterConnection connection =
-        (ClusterConnection) 
ConnectionFactory.createConnection(HTU.getConfiguration());
-    //load some data to primary
-    HTU.loadNumericRows(table, f, 0, 1000);
+    try (AsyncClusterConnection conn = ClusterConnectionFactory
+      .createAsyncClusterConnection(HTU.getConfiguration(), null, 
User.getCurrent())) {
+      openRegion(HTU, rs0, hriSecondary);
+      // load some data to primary
+      HTU.loadNumericRows(table, f, 0, 1000);
 
-    Assert.assertEquals(1000, entries.size());
-    // replay the edits to the secondary using replay callable
-    replicateUsingCallable(connection, entries);
+      Assert.assertEquals(1000, entries.size());
 
-    Region region = rs0.getRegion(hriSecondary.getEncodedName());
-    HTU.verifyNumericRows(region, f, 0, 1000);
+      // replay the edits to the secondary using replay callable
+      replicateUsingCallable(conn, entries);
 
-    HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to 
primary
+      Region region = rs0.getRegion(hriSecondary.getEncodedName());
+      HTU.verifyNumericRows(region, f, 0, 1000);
 
-    // move the secondary region from RS0 to RS1
-    closeRegion(HTU, rs0, hriSecondary);
-    openRegion(HTU, rs1, hriSecondary);
+      HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to 
primary
 
-    // replicate the new data
-    replicateUsingCallable(connection, entries);
+      // move the secondary region from RS0 to RS1
+      closeRegion(HTU, rs0, hriSecondary);
+      openRegion(HTU, rs1, hriSecondary);
 
-    region = rs1.getRegion(hriSecondary.getEncodedName());
-    // verify the new data. old data may or may not be there
-    HTU.verifyNumericRows(region, f, 1000, 2000);
+      // replicate the new data
+      replicateUsingCallable(conn, entries);
 
-    HTU.deleteNumericRows(table, f, 0, 2000);
-    closeRegion(HTU, rs1, hriSecondary);
-    connection.close();
+      region = rs1.getRegion(hriSecondary.getEncodedName());
+      // verify the new data. old data may or may not be there
+      HTU.verifyNumericRows(region, f, 1000, 2000);
+
+      HTU.deleteNumericRows(table, f, 0, 2000);
+      closeRegion(HTU, rs1, hriSecondary);
+    }
   }
 
   @Test
   public void testRegionReplicaReplicationEndpointReplicate() throws Exception 
{
     // tests replaying the edits to a secondary region replica using the 
RRRE.replicate()
     openRegion(HTU, rs0, hriSecondary);
-    ClusterConnection connection =
-        (ClusterConnection) 
ConnectionFactory.createConnection(HTU.getConfiguration());
     RegionReplicaReplicationEndpoint replicator = new 
RegionReplicaReplicationEndpoint();
 
     ReplicationEndpoint.Context context = 
mock(ReplicationEndpoint.Context.class);
     when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
     when(context.getMetrics()).thenReturn(mock(MetricsSource.class));
-
+    when(context.getServer()).thenReturn(rs0);
+    when(context.getTableDescriptors()).thenReturn(rs0.getTableDescriptors());
     replicator.init(context);
     replicator.startAsync();
 
@@ -272,12 +264,11 @@ public class TestRegionReplicaReplicationEndpointNoMaster 
{
     final String fakeWalGroupId = "fakeWALGroup";
     replicator.replicate(new 
ReplicateContext().setEntries(Lists.newArrayList(entries))
         .setWalGroupId(fakeWalGroupId));
-
+    replicator.stop();
     Region region = rs0.getRegion(hriSecondary.getEncodedName());
     HTU.verifyNumericRows(region, f, 0, 1000);
 
     HTU.deleteNumericRows(table, f, 0, 1000);
     closeRegion(HTU, rs0, hriSecondary);
-    connection.close();
   }
 }

Reply via email to