This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch HBASE-21512 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 132fe1c1833584bfb09e4a1e18b13e5e7b5a9198 Author: zhangduo <zhang...@apache.org> AuthorDate: Tue Jan 1 21:27:14 2019 +0800 HBASE-21579 Use AsyncClusterConnection for HBaseInterClusterReplicationEndpoint --- .../hbase/client/AsyncRegionServerAdmin.java | 14 +++++--- .../hbase/protobuf/ReplicationProtbufUtil.java | 35 ++++++++++--------- .../HBaseInterClusterReplicationEndpoint.java | 31 +++++++++-------- .../regionserver/ReplicationSinkManager.java | 40 ++++++++-------------- .../hbase/replication/SyncReplicationTestBase.java | 12 +++---- .../regionserver/TestReplicationSinkManager.java | 21 +++++------- 6 files changed, 74 insertions(+), 79 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java index 9accd89..b9141a9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.yetus.audience.InterfaceAudience; @@ -94,9 +95,9 @@ public class AsyncRegionServerAdmin { void call(AdminService.Interface stub, HBaseRpcController controller, RpcCallback<RESP> done); } - private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall) { + private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall, CellScanner cellScanner) { CompletableFuture<RESP> future = new CompletableFuture<>(); - HBaseRpcController controller = conn.rpcControllerFactory.newController(); + HBaseRpcController controller = conn.rpcControllerFactory.newController(cellScanner); try { rpcCall.call(conn.getAdminStub(server), controller, new RpcCallback<RESP>() { @@ -115,6 +116,10 @@ public class AsyncRegionServerAdmin { return future; } + private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall) { + return call(rpcCall, null); + } + public CompletableFuture<GetRegionInfoResponse> getRegionInfo(GetRegionInfoRequest request) { return call((stub, controller, done) -> stub.getRegionInfo(controller, request, done)); } @@ -154,8 +159,9 @@ public class AsyncRegionServerAdmin { } public CompletableFuture<ReplicateWALEntryResponse> replicateWALEntry( - ReplicateWALEntryRequest request) { - return call((stub, controller, done) -> stub.replicateWALEntry(controller, request, done)); + ReplicateWALEntryRequest request, CellScanner cellScanner) { + return call((stub, controller, done) -> stub.replicateWALEntry(controller, request, done), + cellScanner); } public CompletableFuture<ReplicateWALEntryResponse> replay(ReplicateWALEntryRequest request) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java index c1b3911..74fad26 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java @@ -20,51 +20,54 @@ package org.apache.hadoop.hbase.protobuf; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ExecutionException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; import org.apache.hadoop.hbase.io.SizedCellScanner; -import org.apache.hadoop.hbase.ipc.HBaseRpcController; -import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.base.Throwables; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; @InterfaceAudience.Private public class ReplicationProtbufUtil { + /** - * A helper to replicate a list of WAL entries using admin protocol. - * @param admin Admin service + * A helper to replicate a list of WAL entries using region server admin + * @param admin the region server admin * @param entries Array of WAL entries to be replicated * @param replicationClusterId Id which will uniquely identify source cluster FS client * configurations in the replication configuration directory * @param sourceBaseNamespaceDir Path to source cluster base namespace directory * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory - * @throws java.io.IOException */ - public static void replicateWALEntry(final AdminService.BlockingInterface admin, - final Entry[] entries, String replicationClusterId, Path sourceBaseNamespaceDir, - Path sourceHFileArchiveDir) throws IOException { - Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = - buildReplicateWALEntryRequest(entries, null, replicationClusterId, sourceBaseNamespaceDir, - sourceHFileArchiveDir); - HBaseRpcController controller = new HBaseRpcControllerImpl(p.getSecond()); + public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[] entries, + String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir) + throws IOException { + Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest( + entries, null, replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir); try { - admin.replicateWALEntry(controller, p.getFirst()); - } catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) { - throw ProtobufUtil.getServiceException(e); + admin.replicateWALEntry(p.getFirst(), p.getSecond()).get(); + } catch (InterruptedException e) { + throw (IOException) new InterruptedIOException().initCause(e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + Throwables.propagateIfPossible(cause, IOException.class); + throw new IOException(e); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 7db53aa..0359096 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -39,7 +39,6 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; - import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -48,13 +47,16 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.AsyncClusterConnection; +import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; +import org.apache.hadoop.hbase.client.ClusterConnectionFactory; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -65,8 +67,6 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface; - /** * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} * implementation for replicating to another HBase cluster. @@ -85,8 +85,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2; - private ClusterConnection conn; - private Configuration localConf; + private AsyncClusterConnection conn; private Configuration conf; // How long should we sleep for each retry private long sleepForRetries; @@ -117,7 +116,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi public void init(Context context) throws IOException { super.init(context); this.conf = HBaseConfiguration.create(ctx.getConfiguration()); - this.localConf = HBaseConfiguration.create(ctx.getLocalConfiguration()); decorateConf(); this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", @@ -132,12 +130,13 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi // TODO: This connection is replication specific or we should make it particular to // replication and make replication specific settings such as compression or codec to use // passing Cells. - this.conn = (ClusterConnection) ConnectionFactory.createConnection(this.conf); + this.conn = + ClusterConnectionFactory.createAsyncClusterConnection(conf, null, User.getCurrent()); this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); this.metrics = context.getMetrics(); // ReplicationQueueInfo parses the peerId out of the znode for us - this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf); + this.replicationSinkMgr = new ReplicationSinkManager(conn, this, this.conf); // per sink thread pool this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT); @@ -284,9 +283,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi } private void reconnectToPeerCluster() { - ClusterConnection connection = null; + AsyncClusterConnection connection = null; try { - connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf); + connection = + ClusterConnectionFactory.createAsyncClusterConnection(conf, null, User.getCurrent()); } catch (IOException ioe) { LOG.warn("Failed to create connection for peer cluster", ioe); } @@ -367,7 +367,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi } continue; } - if (this.conn == null || this.conn.isClosed()) { + if (this.conn == null) { reconnectToPeerCluster(); } try { @@ -480,10 +480,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi entriesHashCode, entries.size(), size, replicationClusterId); } sinkPeer = replicationSinkMgr.getReplicationSink(); - BlockingInterface rrs = sinkPeer.getRegionServer(); + AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer(); try { - ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]), - replicationClusterId, baseNamespaceDir, hfileArchiveDir); + ReplicationProtbufUtil.replicateWALEntry(rsAdmin, + entries.toArray(new Entry[entries.size()]), replicationClusterId, baseNamespaceDir, + hfileArchiveDir); LOG.trace("Completed replicating batch {}", entriesHashCode); } catch (IOException e) { LOG.trace("Failed replicating batch {}", entriesHashCode, e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java index 3cd7884..21b07ac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java @@ -21,11 +21,11 @@ import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.ClusterConnection; -import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.AsyncClusterConnection; +import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -35,8 +35,6 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.collect.Maps; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; - /** * Maintains a collection of peers to replicate to, and randomly selects a * single peer to replicate to per set of data to replicate. Also handles @@ -61,9 +59,7 @@ public class ReplicationSinkManager { static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.5f; - private final Connection conn; - - private final String peerClusterId; + private final AsyncClusterConnection conn; private final HBaseReplicationEndpoint endpoint; @@ -77,8 +73,6 @@ public class ReplicationSinkManager { // replication sinks is refreshed private final int badSinkThreshold; - private final Random random; - // A timestamp of the last time the list of replication peers changed private long lastUpdateToPeers; @@ -88,26 +82,22 @@ public class ReplicationSinkManager { /** * Instantiate for a single replication peer cluster. * @param conn connection to the peer cluster - * @param peerClusterId identifier of the peer cluster * @param endpoint replication endpoint for inter cluster replication * @param conf HBase configuration, used for determining replication source ratio and bad peer * threshold */ - public ReplicationSinkManager(ClusterConnection conn, String peerClusterId, - HBaseReplicationEndpoint endpoint, Configuration conf) { + public ReplicationSinkManager(AsyncClusterConnection conn, HBaseReplicationEndpoint endpoint, + Configuration conf) { this.conn = conn; - this.peerClusterId = peerClusterId; this.endpoint = endpoint; this.badReportCounts = Maps.newHashMap(); this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO); - this.badSinkThreshold = conf.getInt("replication.bad.sink.threshold", - DEFAULT_BAD_SINK_THRESHOLD); - this.random = new Random(); + this.badSinkThreshold = + conf.getInt("replication.bad.sink.threshold", DEFAULT_BAD_SINK_THRESHOLD); } /** * Get a randomly-chosen replication sink to replicate to. - * * @return a replication sink to replicate to */ public synchronized SinkPeer getReplicationSink() throws IOException { @@ -119,8 +109,8 @@ public class ReplicationSinkManager { if (sinks.isEmpty()) { throw new IOException("No replication sinks are available"); } - ServerName serverName = sinks.get(random.nextInt(sinks.size())); - return new SinkPeer(serverName, ((ClusterConnection) conn).getAdmin(serverName)); + ServerName serverName = sinks.get(ThreadLocalRandom.current().nextInt(sinks.size())); + return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName)); } /** @@ -160,7 +150,7 @@ public class ReplicationSinkManager { */ public synchronized void chooseSinks() { List<ServerName> slaveAddresses = endpoint.getRegionServers(); - Collections.shuffle(slaveAddresses, random); + Collections.shuffle(slaveAddresses, ThreadLocalRandom.current()); int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio); sinks = slaveAddresses.subList(0, numSinks); lastUpdateToPeers = System.currentTimeMillis(); @@ -182,9 +172,9 @@ public class ReplicationSinkManager { */ public static class SinkPeer { private ServerName serverName; - private AdminService.BlockingInterface regionServer; + private AsyncRegionServerAdmin regionServer; - public SinkPeer(ServerName serverName, AdminService.BlockingInterface regionServer) { + public SinkPeer(ServerName serverName, AsyncRegionServerAdmin regionServer) { this.serverName = serverName; this.regionServer = regionServer; } @@ -193,10 +183,8 @@ public class ReplicationSinkManager { return serverName; } - public AdminService.BlockingInterface getRegionServer() { + public AsyncRegionServerAdmin getRegionServer() { return regionServer; } - } - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java index f373590..e0d112d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java @@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.StartMiniClusterOption; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.AsyncClusterConnection; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; @@ -250,19 +250,19 @@ public class SyncReplicationTestBase { protected final void verifyReplicationRequestRejection(HBaseTestingUtility utility, boolean expectedRejection) throws Exception { HRegionServer regionServer = utility.getRSForFirstRegionInTable(TABLE_NAME); - ClusterConnection connection = regionServer.getClusterConnection(); + AsyncClusterConnection connection = regionServer.getAsyncClusterConnection(); Entry[] entries = new Entry[10]; for (int i = 0; i < entries.length; i++) { entries[i] = new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit()); } if (!expectedRejection) { - ReplicationProtbufUtil.replicateWALEntry(connection.getAdmin(regionServer.getServerName()), - entries, null, null, null); + ReplicationProtbufUtil.replicateWALEntry( + connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null); } else { try { - ReplicationProtbufUtil.replicateWALEntry(connection.getAdmin(regionServer.getServerName()), - entries, null, null, null); + ReplicationProtbufUtil.replicateWALEntry( + connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null); fail("Should throw IOException when sync-replication state is in A or DA"); } catch (DoNotRetryIOException e) { assertTrue(e.getMessage().contains("Reject to apply to sink cluster")); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java index 39dabb4..60afd40 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java @@ -25,7 +25,8 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.AsyncClusterConnection; +import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; import org.apache.hadoop.hbase.testclassification.ReplicationTests; @@ -37,8 +38,6 @@ import org.junit.experimental.categories.Category; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; - @Category({ReplicationTests.class, SmallTests.class}) public class TestReplicationSinkManager { @@ -46,16 +45,14 @@ public class TestReplicationSinkManager { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestReplicationSinkManager.class); - private static final String PEER_CLUSTER_ID = "PEER_CLUSTER_ID"; - private HBaseReplicationEndpoint replicationEndpoint; private ReplicationSinkManager sinkManager; @Before public void setUp() { replicationEndpoint = mock(HBaseReplicationEndpoint.class); - sinkManager = new ReplicationSinkManager(mock(ClusterConnection.class), - PEER_CLUSTER_ID, replicationEndpoint, new Configuration()); + sinkManager = new ReplicationSinkManager(mock(AsyncClusterConnection.class), + replicationEndpoint, new Configuration()); } @Test @@ -100,7 +97,7 @@ public class TestReplicationSinkManager { // Sanity check assertEquals(1, sinkManager.getNumSinks()); - SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class)); + SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class)); sinkManager.reportBadSink(sinkPeer); @@ -131,7 +128,7 @@ public class TestReplicationSinkManager { ServerName serverName = sinkManager.getSinksForTesting().get(0); - SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class)); + SinkPeer sinkPeer = new SinkPeer(serverName, mock(AsyncRegionServerAdmin.class)); sinkManager.reportSinkSuccess(sinkPeer); // has no effect, counter does not go negative for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) { @@ -147,7 +144,7 @@ public class TestReplicationSinkManager { // serverName = sinkManager.getSinksForTesting().get(0); - sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class)); + sinkPeer = new SinkPeer(serverName, mock(AsyncRegionServerAdmin.class)); for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-1; i++) { sinkManager.reportBadSink(sinkPeer); } @@ -188,8 +185,8 @@ public class TestReplicationSinkManager { ServerName serverNameA = sinkList.get(0); ServerName serverNameB = sinkList.get(1); - SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class)); - SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AdminService.BlockingInterface.class)); + SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class)); + SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class)); for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) { sinkManager.reportBadSink(sinkPeerA);