This is an automated email from the ASF dual-hosted git repository. sunxin pushed a commit to branch HBASE-24666 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 1553b39b8135f0a8fffe1a9089b8bc4f51d9c945 Author: XinSun <ddu...@gmail.com> AuthorDate: Mon Nov 23 11:01:55 2020 +0800 HBASE-25113 [testing] HBaseCluster support ReplicationServer for UTs (#2662) Signed-off-by: Guanghao Zhang <zg...@apache.org> --- .../org/apache/hadoop/hbase/LocalHBaseCluster.java | 63 ++++++++++++++++++- .../hbase/replication/HReplicationServer.java | 13 ++++ .../apache/hadoop/hbase/util/JVMClusterUtil.java | 57 +++++++++++++++++- .../apache/hadoop/hbase/HBaseTestingUtility.java | 8 +-- .../org/apache/hadoop/hbase/MiniHBaseCluster.java | 70 ++++++++++++++++++---- .../hadoop/hbase/StartMiniClusterOption.java | 24 ++++++-- .../replication/TestReplicationServerSink.java | 44 +++++++------- hbase-server/src/test/resources/hbase-site.xml | 7 +++ 8 files changed, 242 insertions(+), 44 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java index f4847b9..24b658f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java @@ -32,9 +32,11 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.replication.HReplicationServer; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.util.JVMClusterUtil.ReplicationServerThread; import org.apache.hadoop.hbase.util.Threads; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -60,7 +62,10 @@ import org.slf4j.LoggerFactory; public class LocalHBaseCluster { private static final Logger LOG = LoggerFactory.getLogger(LocalHBaseCluster.class); private final List<JVMClusterUtil.MasterThread> masterThreads = new CopyOnWriteArrayList<>(); - private final List<JVMClusterUtil.RegionServerThread> regionThreads = new CopyOnWriteArrayList<>(); + private final List<JVMClusterUtil.RegionServerThread> regionThreads = + new CopyOnWriteArrayList<>(); + private final List<JVMClusterUtil.ReplicationServerThread> replicationThreads = + new CopyOnWriteArrayList<>(); private final static int DEFAULT_NO = 1; /** local mode */ public static final String LOCAL = "local"; @@ -259,6 +264,26 @@ public class LocalHBaseCluster { }); } + @SuppressWarnings("unchecked") + public JVMClusterUtil.ReplicationServerThread addReplicationServer( + Configuration config, final int index) throws IOException { + // Create each replication server with its own Configuration instance so each has + // its Connection instance rather than share (see HBASE_INSTANCES down in + // the guts of ConnectionManager). + JVMClusterUtil.ReplicationServerThread rst = + JVMClusterUtil.createReplicationServerThread(config, index); + this.replicationThreads.add(rst); + return rst; + } + + public JVMClusterUtil.ReplicationServerThread addReplicationServer( + final Configuration config, final int index, User user) + throws IOException, InterruptedException { + return user.runAs( + (PrivilegedExceptionAction<ReplicationServerThread>) () -> addReplicationServer(config, + index)); + } + /** * @param serverNumber * @return region server @@ -290,6 +315,40 @@ public class LocalHBaseCluster { } /** + * @param serverNumber replication server number + * @return replication server + */ + public HReplicationServer getReplicationServer(int serverNumber) { + return replicationThreads.get(serverNumber).getReplicationServer(); + } + + /** + * @return Read-only list of replication server threads. + */ + public List<JVMClusterUtil.ReplicationServerThread> getReplicationServers() { + return Collections.unmodifiableList(this.replicationThreads); + } + + /** + * @return List of running servers (Some servers may have been killed or + * aborted during lifetime of cluster; these servers are not included in this + * list). + */ + public List<JVMClusterUtil.ReplicationServerThread> getLiveReplicationServers() { + List<JVMClusterUtil.ReplicationServerThread> liveServers = new ArrayList<>(); + List<ReplicationServerThread> list = getReplicationServers(); + for (JVMClusterUtil.ReplicationServerThread rst: list) { + if (rst.isAlive()) { + liveServers.add(rst); + } + else { + LOG.info("Not alive {}", rst.getName()); + } + } + return liveServers; + } + + /** * @return the Configuration used by this LocalHBaseCluster */ public Configuration getConfiguration() { @@ -430,7 +489,7 @@ public class LocalHBaseCluster { * Start the cluster. */ public void startup() throws IOException { - JVMClusterUtil.startup(this.masterThreads, this.regionThreads); + JVMClusterUtil.startup(this.masterThreads, this.regionThreads, this.replicationThreads); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java index 4c8bb11..4c7ce0a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java @@ -445,6 +445,19 @@ public class HReplicationServer extends Thread implements Server, ReplicationSou return this.stopped; } + public void waitForServerOnline(){ + while (!isStopped() && !isOnline()) { + synchronized (online) { + try { + online.wait(msgInterval); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + } + } + } + /** * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to * be hooked up to WAL. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java index cc0f49a..9568d80 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.replication.HReplicationServer; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,6 +73,33 @@ public class JVMClusterUtil { } /** + * Datastructure to hold ReplicationServer Thread and ReplicationServer instance + */ + public static class ReplicationServerThread extends Thread { + private final HReplicationServer replicationServer; + + public ReplicationServerThread(final HReplicationServer r, final int index) { + super(r, "ReplicationServer:" + index + ";" + r.getServerName().toShortString()); + this.replicationServer = r; + } + + /** + * @return the replication server + */ + public HReplicationServer getReplicationServer() { + return this.replicationServer; + } + + /** + * Block until the replication server has come online, indicating it is ready + * to be used. + */ + public void waitForServerOnline() { + replicationServer.waitForServerOnline(); + } + } + + /** * Creates a {@link RegionServerThread}. * Call 'start' on the returned thread to make it run. * @param c Configuration to use. @@ -98,6 +126,24 @@ public class JVMClusterUtil { return new JVMClusterUtil.RegionServerThread(server, index); } + /** + * Creates a {@link ReplicationServerThread}. + * Call 'start' on the returned thread to make it run. + * @param c Configuration to use. + * @param index Used distinguishing the object returned. + * @throws IOException + * @return Replication server added. + */ + public static JVMClusterUtil.ReplicationServerThread createReplicationServerThread( + final Configuration c, final int index) throws IOException { + HReplicationServer server; + try { + server = new HReplicationServer(c); + } catch (Exception e) { + throw new IOException(e); + } + return new JVMClusterUtil.ReplicationServerThread(server, index); + } /** * Datastructure to hold Master Thread and Master instance @@ -122,7 +168,7 @@ public class JVMClusterUtil { * @param c Configuration to use. * @param hmc Class to create. * @param index Used distinguishing the object returned. - * @throws IOException + * @throws IOException exception * @return Master added. */ public static JVMClusterUtil.MasterThread createMasterThread(final Configuration c, @@ -165,7 +211,8 @@ public class JVMClusterUtil { * @return Address to use contacting primary master. */ public static String startup(final List<JVMClusterUtil.MasterThread> masters, - final List<JVMClusterUtil.RegionServerThread> regionservers) throws IOException { + final List<JVMClusterUtil.RegionServerThread> regionservers, + final List<JVMClusterUtil.ReplicationServerThread> replicationServers) throws IOException { // Implementation note: This method relies on timed sleeps in a loop. It's not great, and // should probably be re-written to use actual synchronization objects, but it's ok for now @@ -193,6 +240,12 @@ public class JVMClusterUtil { } } + if (replicationServers != null) { + for (JVMClusterUtil.ReplicationServerThread t: replicationServers) { + t.start(); + } + } + // Wait for an active master to be initialized (implies being master) // with this, when we return the cluster is complete final int initTimeout = configuration != null ? Integer.parseInt( diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 29e8883..36480f4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -1115,8 +1115,8 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { Configuration c = new Configuration(this.conf); TraceUtil.initTracer(c); this.hbaseCluster = new MiniHBaseCluster(c, option.getNumMasters(), - option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(), - option.getMasterClass(), option.getRsClass()); + option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(), + option.getNumReplicationServers(), option.getMasterClass(), option.getRsClass()); // Populate the master address configuration from mini cluster configuration. conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c)); // Don't leave here till we've done a successful scan of the hbase:meta @@ -1241,8 +1241,8 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { closeConnection(); this.hbaseCluster = new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumAlwaysStandByMasters(), - option.getNumRegionServers(), option.getRsPorts(), option.getMasterClass(), - option.getRsClass()); + option.getNumRegionServers(), option.getRsPorts(), option.getNumReplicationServers(), + option.getMasterClass(), option.getRsClass()); // Don't leave here till we've done a successful scan of the hbase:meta Connection conn = ConnectionFactory.createConnection(this.conf); Table t = conn.getTable(TableName.META_TABLE_NAME); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java index 4d40ca4..2a7968a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java @@ -32,11 +32,13 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.replication.HReplicationServer; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.test.MetricsAssertHelper; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.util.JVMClusterUtil.ReplicationServerThread; import org.apache.hadoop.hbase.util.Threads; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -86,10 +88,10 @@ public class MiniHBaseCluster extends HBaseCluster { * @param numRegionServers initial number of region servers to start. */ public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers, - Class<? extends HMaster> masterClass, - Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) + Class<? extends HMaster> masterClass, + Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) throws IOException, InterruptedException { - this(conf, numMasters, 0, numRegionServers, null, masterClass, regionserverClass); + this(conf, numMasters, 0, numRegionServers, null, 0, masterClass, regionserverClass); } /** @@ -97,20 +99,22 @@ public class MiniHBaseCluster extends HBaseCluster { * restart where for sure the regionservers come up on same address+port (but * just with different startcode); by default mini hbase clusters choose new * arbitrary ports on each cluster start. + * @param numReplicationServers initial number of replication servers to start. * @throws IOException * @throws InterruptedException */ public MiniHBaseCluster(Configuration conf, int numMasters, int numAlwaysStandByMasters, - int numRegionServers, List<Integer> rsPorts, Class<? extends HMaster> masterClass, - Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) + int numRegionServers, List<Integer> rsPorts, int numReplicationServers, + Class<? extends HMaster> masterClass, + Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) throws IOException, InterruptedException { super(conf); // Hadoop 2 CompatibilityFactory.getInstance(MetricsAssertHelper.class).init(); - init(numMasters, numAlwaysStandByMasters, numRegionServers, rsPorts, masterClass, - regionserverClass); + init(numMasters, numAlwaysStandByMasters, numRegionServers, rsPorts, numReplicationServers, + masterClass, regionserverClass); this.initialClusterStatus = getClusterMetrics(); } @@ -227,7 +231,8 @@ public class MiniHBaseCluster extends HBaseCluster { } private void init(final int nMasterNodes, final int numAlwaysStandByMasters, - final int nRegionNodes, List<Integer> rsPorts, Class<? extends HMaster> masterClass, + final int nRegionNodes, List<Integer> rsPorts, int numReplicationServers, + Class<? extends HMaster> masterClass, Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) throws IOException, InterruptedException { try { @@ -248,11 +253,17 @@ public class MiniHBaseCluster extends HBaseCluster { if (rsPorts != null) { rsConf.setInt(HConstants.REGIONSERVER_PORT, rsPorts.get(i)); } - User user = HBaseTestingUtility.getDifferentUser(rsConf, - ".hfs."+index++); + User user = HBaseTestingUtility.getDifferentUser(rsConf, ".hfs." + index++); hbaseCluster.addRegionServer(rsConf, i, user); } + // manually add the replication servers as other users + for (int i = 0; i < numReplicationServers; i++) { + Configuration rsConf = HBaseConfiguration.create(conf); + User user = HBaseTestingUtility.getDifferentUser(rsConf, ".hfs." + index++); + hbaseCluster.addReplicationServer(rsConf, i, user); + } + hbaseCluster.startup(); } catch (IOException e) { shutdown(); @@ -791,7 +802,7 @@ public class MiniHBaseCluster extends HBaseCluster { /** * Grab a numbered region server of your choice. - * @param serverNumber + * @param serverNumber region server number * @return region server */ public HRegionServer getRegionServer(int serverNumber) { @@ -805,6 +816,43 @@ public class MiniHBaseCluster extends HBaseCluster { .findFirst().orElse(null); } + /** + * @return Number of live replication servers in the cluster currently. + */ + public int getNumLiveReplicationServers() { + return this.hbaseCluster.getLiveReplicationServers().size(); + } + + /** + * @return List of replication server threads. + */ + public List<JVMClusterUtil.ReplicationServerThread> getReplicationServerThreads() { + return this.hbaseCluster.getReplicationServers(); + } + + /** + * @return List of live replication server threads (skips the aborted and the killed) + */ + public List<JVMClusterUtil.ReplicationServerThread> getLiveReplicationServerThreads() { + return this.hbaseCluster.getLiveReplicationServers(); + } + + /** + * Grab a numbered replication server of your choice. + * @param serverNumber + * @return replication server + */ + public HReplicationServer getReplicationServer(int serverNumber) { + return hbaseCluster.getReplicationServer(serverNumber); + } + + public HReplicationServer getReplicationServer(ServerName serverName) { + return hbaseCluster.getReplicationServers().stream() + .map(ReplicationServerThread::getReplicationServer) + .filter(r -> r.getServerName().equals(serverName)) + .findFirst().orElse(null); + } + public List<HRegion> getRegions(byte[] tableName) { return getRegions(TableName.valueOf(tableName)); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/StartMiniClusterOption.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/StartMiniClusterOption.java index 7a9bd68..0aa35ed 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/StartMiniClusterOption.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/StartMiniClusterOption.java @@ -75,6 +75,10 @@ public final class StartMiniClusterOption { * The class to use as HRegionServer, or null for default. */ private Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass; + /** + * Number of replication servers to start up. + */ + private final int numReplicationServers; /** * Number of datanodes. Used to create mini DSF cluster. Surpassed by {@link #dataNodeHosts} size. @@ -109,7 +113,8 @@ public final class StartMiniClusterOption { */ private StartMiniClusterOption(int numMasters, int numAlwaysStandByMasters, Class<? extends HMaster> masterClass, int numRegionServers, List<Integer> rsPorts, - Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass, int numDataNodes, + Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass, + int numReplicationServers, int numDataNodes, String[] dataNodeHosts, int numZkServers, boolean createRootDir, boolean createWALDir) { this.numMasters = numMasters; this.numAlwaysStandByMasters = numAlwaysStandByMasters; @@ -117,6 +122,7 @@ public final class StartMiniClusterOption { this.numRegionServers = numRegionServers; this.rsPorts = rsPorts; this.rsClass = rsClass; + this.numReplicationServers = numReplicationServers; this.numDataNodes = numDataNodes; this.dataNodeHosts = dataNodeHosts; this.numZkServers = numZkServers; @@ -148,6 +154,10 @@ public final class StartMiniClusterOption { return rsClass; } + public int getNumReplicationServers() { + return numReplicationServers; + } + public int getNumDataNodes() { return numDataNodes; } @@ -196,6 +206,7 @@ public final class StartMiniClusterOption { private Class<? extends HMaster> masterClass = null; private int numRegionServers = 1; private List<Integer> rsPorts = null; + private int numReplicationServers; private Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass = null; private int numDataNodes = 1; private String[] dataNodeHosts = null; @@ -210,9 +221,9 @@ public final class StartMiniClusterOption { if (dataNodeHosts != null && dataNodeHosts.length != 0) { numDataNodes = dataNodeHosts.length; } - return new StartMiniClusterOption(numMasters,numAlwaysStandByMasters, masterClass, - numRegionServers, rsPorts, rsClass, numDataNodes, dataNodeHosts, numZkServers, - createRootDir, createWALDir); + return new StartMiniClusterOption(numMasters, numAlwaysStandByMasters, masterClass, + numRegionServers, rsPorts, rsClass, numReplicationServers, + numDataNodes, dataNodeHosts, numZkServers, createRootDir, createWALDir); } public Builder numMasters(int numMasters) { @@ -269,6 +280,11 @@ public final class StartMiniClusterOption { this.createWALDir = createWALDir; return this; } + + public Builder numReplicationServers(int numReplicationServers) { + this.numReplicationServers = numReplicationServers; + return this; + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSink.java index bad1dc1..a1cbebb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSink.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSink.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.StartMiniClusterOption; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.AsyncClusterConnection; import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin; @@ -77,8 +78,8 @@ public class TestReplicationServerSink { private static HMaster MASTER; - private static HReplicationServer replicationServer; - private static ServerName replicationServerName; + private static HReplicationServer REPLICATION_SERVER; + private static ServerName REPLICATION_SERVER_NAME; private static Path baseNamespaceDir; private static Path hfileArchiveDir; @@ -93,9 +94,13 @@ public class TestReplicationServerSink { public static void beforeClass() throws Exception { CONF.setLong(HBASE_CLIENT_OPERATION_TIMEOUT, 1000); CONF.setLong(ONLINE_SERVER_REFRESH_INTERVAL, 10000); - TEST_UTIL.startMiniCluster(); + CONF.setBoolean(HConstants.REPLICATION_OFFLOAD_ENABLE_KEY, true); + TEST_UTIL.startMiniCluster(StartMiniClusterOption.builder().numReplicationServers(1).build()); MASTER = TEST_UTIL.getMiniHBaseCluster().getMaster(); TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster(); + REPLICATION_SERVER = TEST_UTIL.getMiniHBaseCluster().getReplicationServerThreads().get(0) + .getReplicationServer(); + REPLICATION_SERVER_NAME = REPLICATION_SERVER.getServerName(); Path rootDir = CommonFSUtils.getRootDir(CONF); baseNamespaceDir = new Path(rootDir, new Path(HConstants.BASE_NAMESPACE_DIR)); @@ -110,11 +115,6 @@ public class TestReplicationServerSink { @Before public void before() throws Exception { - replicationServer = new HReplicationServer(CONF); - replicationServer.start(); - TEST_UTIL.waitFor(60000, () -> replicationServer.isOnline()); - replicationServerName = replicationServer.getServerName(); - TEST_UTIL.createTable(TABLENAME, FAMILY); TEST_UTIL.waitTableAvailable(TABLENAME); } @@ -122,11 +122,6 @@ public class TestReplicationServerSink { @After public void after() throws IOException { TEST_UTIL.deleteTableIfAny(TABLENAME); - if (!replicationServer.isStopped()) { - replicationServer.stop("test"); - } - replicationServer = null; - replicationServerName = null; } /** @@ -137,10 +132,10 @@ public class TestReplicationServerSink { AsyncClusterConnection conn = TEST_UTIL.getHBaseCluster().getMaster().getAsyncClusterConnection(); AsyncReplicationServerAdmin replAdmin = - conn.getReplicationServerAdmin(replicationServerName); + conn.getReplicationServerAdmin(REPLICATION_SERVER_NAME); ReplicationServerSinkPeer sinkPeer = - new ReplicationServerSinkPeer(replicationServerName, replAdmin); + new ReplicationServerSinkPeer(REPLICATION_SERVER_NAME, replAdmin); replicateWALEntryAndVerify(sinkPeer); } @@ -177,23 +172,30 @@ public class TestReplicationServerSink { ReplicationServerManager replicationServerManager = MASTER.getReplicationServerManager(); assertNotNull(replicationServerManager); TEST_UTIL.waitFor(60000, () -> !replicationServerManager.getOnlineServers().isEmpty() - && null != replicationServerManager.getServerMetrics(replicationServerName)); + && null != replicationServerManager.getServerMetrics(REPLICATION_SERVER_NAME)); // put data via replication server testReplicateWAL(); - TEST_UTIL.waitFor(60000, () -> replicationServer.rpcServices.requestCount.sum() > 0 - && replicationServer.rpcServices.requestCount.sum() == replicationServerManager - .getServerMetrics(replicationServerName).getRequestCount()); + TEST_UTIL.waitFor(60000, () -> REPLICATION_SERVER.rpcServices.requestCount.sum() > 0 + && REPLICATION_SERVER.rpcServices.requestCount.sum() == replicationServerManager + .getServerMetrics(REPLICATION_SERVER_NAME).getRequestCount()); } @Test public void testReplicationServerExpire() throws Exception { + int initialNum = TEST_UTIL.getMiniHBaseCluster().getNumLiveReplicationServers(); + HReplicationServer replicationServer = new HReplicationServer(CONF); + replicationServer.start(); + ServerName replicationServerName = replicationServer.getServerName(); + ReplicationServerManager replicationServerManager = MASTER.getReplicationServerManager(); - TEST_UTIL.waitFor(60000, () -> !replicationServerManager.getOnlineServers().isEmpty() + TEST_UTIL.waitFor(60000, () -> + initialNum + 1 == replicationServerManager.getOnlineServers().size() && null != replicationServerManager.getServerMetrics(replicationServerName)); replicationServer.stop("test"); - TEST_UTIL.waitFor(180000, 1000, replicationServerManager.getOnlineServers()::isEmpty); + TEST_UTIL.waitFor(180000, 1000, () -> + initialNum == replicationServerManager.getOnlineServers().size()); assertNull(replicationServerManager.getServerMetrics(replicationServerName)); } } diff --git a/hbase-server/src/test/resources/hbase-site.xml b/hbase-server/src/test/resources/hbase-site.xml index 5e64bfc..36187a3 100644 --- a/hbase-server/src/test/resources/hbase-site.xml +++ b/hbase-server/src/test/resources/hbase-site.xml @@ -90,6 +90,13 @@ </description> </property> <property> + <name>hbase.replicationserver.port</name> + <value>0</value> + <description>Always have replicationservers come up on port '0' so we don't clash over + default ports. + </description> + </property> + <property> <name>hbase.ipc.client.fallback-to-simple-auth-allowed</name> <value>true</value> </property>