This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 662f5e144d2591e27dadcdf46284b3266e92fc88
Author: XinSun <ddu...@gmail.com>
AuthorDate: Sun Sep 20 10:54:43 2020 +0800

    HBASE-24684 Fetch ReplicationSink servers list from HMaster instead o… 
(#2077)
    
    Signed-off-by: Wellington Chevreuil <wchevre...@apache.org>
---
 .../src/main/protobuf/server/master/Master.proto   |  12 +-
 .../hadoop/hbase/coprocessor/MasterObserver.java   |  16 +++
 .../org/apache/hadoop/hbase/master/HMaster.java    |   5 +
 .../hadoop/hbase/master/MasterCoprocessorHost.java |  18 +++
 .../hadoop/hbase/master/MasterRpcServices.java     |  21 +++
 .../apache/hadoop/hbase/master/MasterServices.java |   6 +
 .../replication/HBaseReplicationEndpoint.java      | 146 +++++++++++++++++++--
 .../regionserver/ReplicationSource.java            |   4 +-
 .../regionserver/ReplicationSourceShipper.java     |   2 +-
 .../hbase/master/MockNoopMasterServices.java       |   5 +
 .../replication/TestHBaseReplicationEndpoint.java  |   5 +
 .../replication/TestReplicationFetchServers.java   | 106 +++++++++++++++
 .../TestGlobalReplicationThrottler.java            |   4 +
 ...stRegionReplicaReplicationEndpointNoMaster.java |   2 +
 .../regionserver/TestReplicationSource.java        |   4 +-
 15 files changed, 337 insertions(+), 19 deletions(-)

diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto 
b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
index 118ce77..7dec566 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
@@ -717,6 +717,13 @@ message BalancerDecisionsResponse {
   repeated BalancerDecision balancer_decision = 1;
 }
 
+message ListReplicationSinkServersRequest {
+}
+
+message ListReplicationSinkServersResponse {
+  repeated ServerName server_name = 1;
+}
+
 service MasterService {
   /** Used by the client to get the number of regions that have received the 
updated schema */
   rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest)
@@ -1146,10 +1153,13 @@ service MasterService {
     returns (RenameRSGroupResponse);
 
   rpc UpdateRSGroupConfig(UpdateRSGroupConfigRequest)
-  returns (UpdateRSGroupConfigResponse);
+    returns (UpdateRSGroupConfigResponse);
 
   rpc GetLogEntries(LogRequest)
     returns(LogEntry);
+
+  rpc ListReplicationSinkServers(ListReplicationSinkServersRequest)
+    returns (ListReplicationSinkServersResponse);
 }
 
 // HBCK Service definitions.
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
index ac35caa..ec009cc 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
@@ -1782,4 +1782,20 @@ public interface MasterObserver {
   default void 
postHasUserPermissions(ObserverContext<MasterCoprocessorEnvironment> ctx,
       String userName, List<Permission> permissions) throws IOException {
   }
+
+  /**
+   * Called before getting servers for replication sink.
+   * @param ctx the coprocessor instance's environment
+   */
+  default void 
preListReplicationSinkServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
+    throws IOException {
+  }
+
+  /**
+   * Called after getting servers for replication sink.
+   * @param ctx the coprocessor instance's environment
+   */
+  default void 
postListReplicationSinkServers(ObserverContext<MasterCoprocessorEnvironment> 
ctx)
+    throws IOException {
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index e4bd3c5..d1a8280 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -3900,4 +3900,9 @@ public class HMaster extends HRegionServer implements 
MasterServices {
   public RSGroupInfoManager getRSGroupInfoManager() {
     return rsGroupInfoManager;
   }
+
+  @Override
+  public List<ServerName> listReplicationSinkServers() throws IOException {
+    return this.serverManager.getOnlineServersList();
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
index 01d1a62..f775eba 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
@@ -2038,4 +2038,22 @@ public class MasterCoprocessorHost
       }
     });
   }
+
+  public void preListReplicationSinkServers() throws IOException {
+    execOperation(coprocEnvironments.isEmpty() ? null : new 
MasterObserverOperation() {
+      @Override
+      public void call(MasterObserver observer) throws IOException {
+        observer.preListReplicationSinkServers(this);
+      }
+    });
+  }
+
+  public void postListReplicationSinkServers() throws IOException {
+    execOperation(coprocEnvironments.isEmpty() ? null : new 
MasterObserverOperation() {
+      @Override
+      public void call(MasterObserver observer) throws IOException {
+        observer.postListReplicationSinkServers(this);
+      }
+    });
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 37fc589..314a2d6 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -263,6 +263,8 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamesp
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
@@ -3375,4 +3377,23 @@ public class MasterRpcServices extends RSRpcServices 
implements
       .addAllBalancerDecision(balancerDecisions).build();
   }
 
+  public ListReplicationSinkServersResponse listReplicationSinkServers(
+    RpcController controller, ListReplicationSinkServersRequest request)
+    throws ServiceException {
+    ListReplicationSinkServersResponse.Builder builder =
+      ListReplicationSinkServersResponse.newBuilder();
+    try {
+      if (master.getMasterCoprocessorHost() != null) {
+        master.getMasterCoprocessorHost().preListReplicationSinkServers();
+      }
+      builder.addAllServerName(master.listReplicationSinkServers().stream()
+        .map(ProtobufUtil::toServerName).collect(Collectors.toList()));
+      if (master.getMasterCoprocessorHost() != null) {
+        master.getMasterCoprocessorHost().postListReplicationSinkServers();
+      }
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return builder.build();
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index 908d212..7b21289 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -553,4 +553,10 @@ public interface MasterServices extends Server {
    * @return The state of the load balancer, or false if the load balancer 
isn't defined.
    */
   boolean isBalancerOn();
+
+  /**
+   * Get a list of servers' addresses for replication sink.
+   * @return a list of servers' address
+   */
+  List<ServerName> listReplicationSinkServers() throws IOException;
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
index 5656497..48719b6 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.hbase.replication;
 
+import static 
org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT;
+import static 
org.apache.hadoop.hbase.HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -28,21 +31,26 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
 import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin;
 import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
 import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.zookeeper.ZKListener;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKListener;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.AuthFailedException;
 import org.apache.zookeeper.KeeperException.ConnectionLossException;
@@ -52,6 +60,12 @@ import org.slf4j.LoggerFactory;
 
 import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
 
 /**
  * A {@link BaseReplicationEndpoint} for replication endpoints whose
@@ -63,6 +77,13 @@ public abstract class HBaseReplicationEndpoint extends 
BaseReplicationEndpoint
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseReplicationEndpoint.class);
 
+  public static final String FETCH_SERVERS_USE_ZK_CONF_KEY =
+      "hbase.replication.fetch.servers.usezk";
+
+  public static final String FETCH_SERVERS_INTERVAL_CONF_KEY =
+      "hbase.replication.fetch.servers.interval";
+  public static final int DEFAULT_FETCH_SERVERS_INTERVAL = 10 * 60 * 1000; // 
10 mins
+
   private ZKWatcher zkw = null;
 
   protected Configuration conf;
@@ -93,6 +114,11 @@ public abstract class HBaseReplicationEndpoint extends 
BaseReplicationEndpoint
 
   private List<ServerName> sinkServers = new ArrayList<>(0);
 
+  private AsyncClusterConnection peerConnection;
+  private boolean fetchServersUseZk = false;
+  private FetchServersChore fetchServersChore;
+  private int shortOperationTimeout;
+
   /*
    * Some implementations of HBaseInterClusterReplicationEndpoint may require 
instantiate different
    * Connection implementations, or initialize it in a different way, so 
defining createConnection
@@ -122,6 +148,19 @@ public abstract class HBaseReplicationEndpoint extends 
BaseReplicationEndpoint
     if (zkw != null) {
       zkw.close();
     }
+    if (fetchServersChore != null) {
+      ChoreService choreService = ctx.getServer().getChoreService();
+      if (null != choreService) {
+        choreService.cancelChore(fetchServersChore);
+      }
+    }
+    if (peerConnection != null) {
+      try {
+        peerConnection.close();
+      } catch (IOException e) {
+        LOG.warn("Attempt to close peerConnection failed.", e);
+      }
+    }
   }
 
   /**
@@ -152,8 +191,27 @@ public abstract class HBaseReplicationEndpoint extends 
BaseReplicationEndpoint
   }
 
   @Override
-  protected void doStart() {
+  protected synchronized void doStart() {
+    this.shortOperationTimeout = ctx.getLocalConfiguration().getInt(
+        HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, 
DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
     try {
+      if 
(ctx.getLocalConfiguration().getBoolean(FETCH_SERVERS_USE_ZK_CONF_KEY, false)) {
+        fetchServersUseZk = true;
+      } else {
+        try {
+          if 
(ReplicationUtils.isPeerClusterSupportReplicationOffload(getPeerConnection())) {
+            fetchServersChore = new FetchServersChore(ctx.getServer(), this);
+            ctx.getServer().getChoreService().scheduleChore(fetchServersChore);
+            fetchServersUseZk = false;
+          } else {
+            fetchServersUseZk = true;
+          }
+        } catch (Throwable t) {
+          fetchServersUseZk = true;
+          LOG.warn("Peer {} try to fetch servers by admin failed. Using zk 
impl.",
+              ctx.getPeerId(), t);
+        }
+      }
       reloadZkWatcher();
       notifyStarted();
     } catch (IOException e) {
@@ -192,7 +250,9 @@ public abstract class HBaseReplicationEndpoint extends 
BaseReplicationEndpoint
     }
     zkw = new ZKWatcher(ctx.getConfiguration(),
         "connection to cluster: " + ctx.getPeerId(), this);
-    zkw.registerListener(new PeerRegionServerListener(this));
+    if (fetchServersUseZk) {
+      zkw.registerListener(new PeerRegionServerListener(this));
+    }
   }
 
   @Override
@@ -208,11 +268,46 @@ public abstract class HBaseReplicationEndpoint extends 
BaseReplicationEndpoint
   }
 
   /**
+   * Get the connection to peer cluster
+   * @return connection to peer cluster
+   * @throws IOException If anything goes wrong connecting
+   */
+  private synchronized AsyncClusterConnection getPeerConnection() throws 
IOException {
+    if (peerConnection == null) {
+      Configuration conf = ctx.getConfiguration();
+      peerConnection = 
ClusterConnectionFactory.createAsyncClusterConnection(conf, null,
+          UserProvider.instantiate(conf).getCurrent());
+    }
+    return peerConnection;
+  }
+
+  /**
+   * Get the list of all the servers that are responsible for replication sink
+   * from the specified peer master
+   * @return list of server addresses or an empty list if the slave is 
unavailable
+   */
+  protected List<ServerName> fetchSlavesAddresses() {
+    try {
+      AsyncClusterConnection peerConn = getPeerConnection();
+      ServerName master = FutureUtils.get(peerConn.getAdmin().getMaster());
+      MasterService.BlockingInterface masterStub = 
MasterService.newBlockingStub(
+        peerConn.getRpcClient()
+          .createBlockingRpcChannel(master, User.getCurrent(), 
shortOperationTimeout));
+      ListReplicationSinkServersResponse resp = masterStub
+        .listReplicationSinkServers(null, 
ListReplicationSinkServersRequest.newBuilder().build());
+      return ProtobufUtil.toServerNameList(resp.getServerNameList());
+    } catch (ServiceException | IOException e) {
+      LOG.error("Peer {} fetches servers failed", ctx.getPeerId(), e);
+    }
+    return Collections.emptyList();
+  }
+
+  /**
    * Get the list of all the region servers from the specified peer
    *
    * @return list of region server addresses or an empty list if the slave is 
unavailable
    */
-  protected List<ServerName> fetchSlavesAddresses() {
+  protected List<ServerName> fetchSlavesAddressesByZK() {
     List<String> children = null;
     try {
       children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, 
zkw.getZNodePaths().rsZNode);
@@ -233,7 +328,12 @@ public abstract class HBaseReplicationEndpoint extends 
BaseReplicationEndpoint
   }
 
   protected synchronized void chooseSinks() {
-    List<ServerName> slaveAddresses = fetchSlavesAddresses();
+    List<ServerName> slaveAddresses = Collections.emptyList();
+    if (fetchServersUseZk) {
+      slaveAddresses = fetchSlavesAddressesByZK();
+    } else {
+      slaveAddresses = fetchSlavesAddresses();
+    }
     if (slaveAddresses.isEmpty()) {
       LOG.warn("No sinks available at peer. Will not be able to replicate");
     }
@@ -264,6 +364,14 @@ public abstract class HBaseReplicationEndpoint extends 
BaseReplicationEndpoint
     return createSinkPeer(serverName);
   }
 
+  private SinkPeer createSinkPeer(ServerName serverName) throws IOException {
+    if (ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) {
+      return new ReplicationServerSinkPeer(serverName, 
conn.getReplicationServerAdmin(serverName));
+    } else {
+      return new RegionServerSinkPeer(serverName, 
conn.getRegionServerAdmin(serverName));
+    }
+  }
+
   /**
    * Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it
    * failed). If a single SinkPeer is reported as bad more than
@@ -373,11 +481,23 @@ public abstract class HBaseReplicationEndpoint extends 
BaseReplicationEndpoint
     }
   }
 
-  private SinkPeer createSinkPeer(ServerName serverName) throws IOException {
-    if (ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) {
-      return new ReplicationServerSinkPeer(serverName, 
conn.getReplicationServerAdmin(serverName));
-    } else {
-      return new RegionServerSinkPeer(serverName, 
conn.getRegionServerAdmin(serverName));
+  /**
+   * Chore that will fetch the list of servers from peer master.
+   */
+  public static class FetchServersChore extends ScheduledChore {
+
+    private HBaseReplicationEndpoint endpoint;
+
+    public FetchServersChore(Server server, HBaseReplicationEndpoint endpoint) 
{
+      super("Peer-" + endpoint.ctx.getPeerId() + "-FetchServersChore", server,
+        server.getConfiguration()
+          .getInt(FETCH_SERVERS_INTERVAL_CONF_KEY, 
DEFAULT_FETCH_SERVERS_INTERVAL));
+      this.endpoint = endpoint;
+    }
+
+    @Override
+    protected void chore() {
+      endpoint.chooseSinks();
     }
   }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 879c604..2bf575d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -343,9 +343,9 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
         Threads.setDaemonThreadRunning(
             walReader, Thread.currentThread().getName()
             + ".replicationSource.wal-reader." + walGroupId + "," + queueId,
-          (t,e) -> this.uncaughtException(t, e, this.manager, 
this.getPeerId()));
+          (t,e) -> this.uncaughtException(t, e, null, this.getPeerId()));
         worker.setWALReader(walReader);
-        worker.startup((t,e) -> this.uncaughtException(t, e, this.manager, 
this.getPeerId()));
+        worker.startup((t,e) -> this.uncaughtException(t, e, null, 
this.getPeerId()));
         return worker;
       }
     });
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index eb558e0..a1983e7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -369,6 +369,6 @@ public class ReplicationSourceShipper extends Thread {
 
     LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication 
WAL Readers.",
       totalToDecrement.longValue());
-    
source.getSourceManager().getTotalBufferUsed().addAndGet(-totalToDecrement.longValue());
+    
source.controller.getTotalBufferUsed().addAndGet(-totalToDecrement.longValue());
   }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
index 7c65005..947ba5d 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
@@ -507,4 +507,9 @@ public class MockNoopMasterServices implements 
MasterServices {
   public boolean isBalancerOn() {
     return false;
   }
+
+  @Override
+  public List<ServerName> listReplicationSinkServers() throws IOException {
+    return null;
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
index 4182eaf..6765794 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
@@ -199,6 +199,11 @@ public class TestHBaseReplicationEndpoint {
     }
 
     @Override
+    public List<ServerName> fetchSlavesAddressesByZK() {
+      return regionServers;
+    }
+
+    @Override
     public boolean replicate(ReplicateContext replicateContext) {
       return false;
     }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java
new file mode 100644
index 0000000..9ceacee
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static 
org.apache.hadoop.hbase.coprocessor.CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.MasterObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestReplicationFetchServers extends TestReplicationBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestReplicationFetchServers.class);
+
+  private static AtomicBoolean fetchFlag = new AtomicBoolean(false);
+
+  public static class MyObserver implements MasterCoprocessor, MasterObserver {
+
+    @Override
+    public Optional<MasterObserver> getMasterObserver() {
+      return Optional.of(this);
+    }
+
+    @Override
+    public void 
postListReplicationSinkServers(ObserverContext<MasterCoprocessorEnvironment> 
ctx) {
+      fetchFlag.set(true);
+    }
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    CONF2.set(MASTER_COPROCESSOR_CONF_KEY, MyObserver.class.getName());
+    TestReplicationBase.setUpBeforeClass();
+  }
+
+  @Before
+  public void beforeMethod() {
+    fetchFlag.set(false);
+  }
+
+  @Test
+  public void testMasterListReplicationPeerServers() throws IOException, 
ServiceException {
+    AsyncClusterConnection conn = UTIL2.getAsyncConnection();
+    ServerName master = UTIL2.getAdmin().getMaster();
+    MasterService.BlockingInterface masterStub = MasterService.newBlockingStub(
+        conn.getRpcClient().createBlockingRpcChannel(master, 
User.getCurrent(), 1000));
+    ListReplicationSinkServersResponse resp = 
masterStub.listReplicationSinkServers(
+        null, ListReplicationSinkServersRequest.newBuilder().build());
+    List<ServerName> servers = 
ProtobufUtil.toServerNameList(resp.getServerNameList());
+    assertFalse(servers.isEmpty());
+    assertTrue(fetchFlag.get());
+  }
+
+  @Test
+  public void testPutData() throws IOException {
+    htable1.put(new Put(row).addColumn(famName, famName, row));
+    UTIL2.waitFor(30000L, () -> !htable2.get(new Get(row)).isEmpty());
+    assertTrue(fetchFlag.get());
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java
index 1538fa3..cfc9fa3 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java
@@ -118,6 +118,10 @@ public class TestGlobalReplicationThrottler {
 
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
+    Admin admin1 = utility1.getAdmin();
+    admin1.removeReplicationPeer("peer1");
+    admin1.removeReplicationPeer("peer2");
+    admin1.removeReplicationPeer("peer3");
     utility2.shutdownMiniCluster();
     utility1.shutdownMiniCluster();
   }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
index ee1ae5f..c676e30 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
@@ -256,11 +256,13 @@ public class TestRegionReplicaReplicationEndpointNoMaster 
{
 
     ReplicationEndpoint.Context context = 
mock(ReplicationEndpoint.Context.class);
     when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
+    when(context.getLocalConfiguration()).thenReturn(HTU.getConfiguration());
     when(context.getMetrics()).thenReturn(mock(MetricsSource.class));
     when(context.getServer()).thenReturn(rs0);
     when(context.getTableDescriptors()).thenReturn(rs0.getTableDescriptors());
     replicator.init(context);
     replicator.startAsync();
+    HTU.waitFor(30000, replicator::isRunning);
 
     //load some data to primary
     HTU.loadNumericRows(table, f, 0, 1000);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index 5817c00..1d9081a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -284,7 +284,7 @@ public class TestReplicationSource {
     ReplicationPeer mockPeer = mock(ReplicationPeer.class);
     Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
     Configuration testConf = HBaseConfiguration.create();
-    source.init(testConf, null, mockManager, null, mockPeer, null,
+    source.init(testConf, null, null, mockManager, null, mockPeer, null,
       "testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class));
     ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null,
       conf, null, 0, null, source);
@@ -310,7 +310,7 @@ public class TestReplicationSource {
     reader.addEntryToBatch(batch, mockEntry);
     reader.entryBatchQueue.put(batch);
     source.terminate("test");
-    assertEquals(0, source.getSourceManager().getTotalBufferUsed().get());
+    assertEquals(0, source.controller.getTotalBufferUsed().get());
   }
 
   /**

Reply via email to