This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch branch-3 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 25fe3698758fd3c9d569ead774072ead0db05a19 Author: Duo Zhang <[email protected]> AuthorDate: Thu Mar 13 22:02:59 2025 +0800 HBASE-29169 Removing the legacy 'region_replica_replication' peer will crash region servers while rolling upgrading (#6772) Signed-off-by: Nihal Jain <[email protected]> (cherry picked from commit 46fcd3c1ec8c17826f8c36186de77d33f4609cab) --- .../protobuf/server/master/MasterProcedure.proto | 1 + .../ZKReplicationQueueStorageForMigration.java | 46 ++++++++++++++-- .../replication/TestZKReplicationQueueStorage.java | 23 ++++++-- ...rateReplicationQueueFromZkToTableProcedure.java | 62 ++++++++++++++++------ .../master/replication/ReplicationPeerManager.java | 52 +++++++++++------- .../replication/TestMigrateReplicationQueue.java | 43 +++++++++++++-- ...StartupWithLegacyRegionReplicationEndpoint.java | 6 +-- 7 files changed, 183 insertions(+), 50 deletions(-) diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto index 81d16b2861c..e52a46ab358 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto @@ -787,6 +787,7 @@ enum MigrateReplicationQueueFromZkToTableState { MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING = 5; MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER = 6; MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER = 7; + MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_CLEAN_UP = 8; } message MigrateReplicationQueueFromZkToTableStateData { diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorageForMigration.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorageForMigration.java index 22cc1314522..c44d3d62ce4 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorageForMigration.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorageForMigration.java @@ -32,8 +32,11 @@ import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZNodePaths; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.base.Splitter; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; /** * Just retain a small set of the methods for the old zookeeper based replication queue storage, for @@ -42,6 +45,9 @@ import org.apache.hbase.thirdparty.com.google.common.base.Splitter; @InterfaceAudience.Private public class ZKReplicationQueueStorageForMigration extends ZKReplicationStorageBase { + private static final Logger LOG = + LoggerFactory.getLogger(ZKReplicationQueueStorageForMigration.class); + public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY = "zookeeper.znode.replication.hfile.refs"; public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs"; @@ -123,6 +129,8 @@ public class ZKReplicationQueueStorageForMigration extends ZKReplicationStorageB return getFileNode(getQueueNode(serverName, queueId), fileName); } + static final String REGION_REPLICA_REPLICATION_PEER = "region_replica_replication"; + @SuppressWarnings("unchecked") public MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>> listAllQueues() throws KeeperException { @@ -136,25 +144,57 @@ public class ZKReplicationQueueStorageForMigration extends ZKReplicationStorageB private ServerName previousServerName; + private boolean hasRegionReplicaReplicationQueue; + + private void cleanupQueuesWithoutRegionReplicaReplication(ServerName serverName) + throws Exception { + String rsZNode = getRsNode(serverName); + List<String> queueIdList = ZKUtil.listChildrenNoWatch(zookeeper, rsZNode); + if (CollectionUtils.isEmpty(queueIdList)) { + return; + } + for (String queueId : queueIdList) { + ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); + if (!queueInfo.getPeerId().equals(REGION_REPLICA_REPLICATION_PEER)) { + ZKUtil.deleteNodeRecursively(zookeeper, getQueueNode(serverName, queueId)); + } + } + } + @Override public Pair<ServerName, List<ZkReplicationQueueData>> next() throws Exception { if (previousServerName != null) { - ZKUtil.deleteNodeRecursively(zookeeper, getRsNode(previousServerName)); + if (hasRegionReplicaReplicationQueue) { + // if there are region_replica_replication queues, we can not delete it, just delete + // other queues, see HBASE-29169. + cleanupQueuesWithoutRegionReplicaReplication(previousServerName); + } else { + ZKUtil.deleteNodeRecursively(zookeeper, getRsNode(previousServerName)); + } } if (!iter.hasNext()) { - ZKUtil.deleteNodeRecursively(zookeeper, queuesZNode); + // If there are region_replica_replication queues then we can not delete the data right + // now, otherwise we may crash the old region servers, see HBASE-29169. + // The migration procedure has a special step to cleanup everything return null; } + hasRegionReplicaReplicationQueue = false; String replicator = iter.next(); ServerName serverName = ServerName.parseServerName(replicator); previousServerName = serverName; List<String> queueIdList = ZKUtil.listChildrenNoWatch(zookeeper, getRsNode(serverName)); - if (queueIdList == null || queueIdList.isEmpty()) { + if (CollectionUtils.isEmpty(queueIdList)) { return Pair.newPair(serverName, Collections.emptyList()); } List<ZkReplicationQueueData> queueDataList = new ArrayList<>(queueIdList.size()); for (String queueIdStr : queueIdList) { ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueIdStr); + if (queueInfo.getPeerId().equals(REGION_REPLICA_REPLICATION_PEER)) { + // we do not need to migrate the data for this queue, skip + LOG.debug("Found region replica replication queue {}, skip", queueInfo); + hasRegionReplicaReplicationQueue = true; + continue; + } ReplicationQueueId queueId; if (queueInfo.getDeadRegionServers().isEmpty()) { queueId = new ReplicationQueueId(serverName, queueInfo.getPeerId()); diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java index e38b7b134e9..b6438667574 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java @@ -125,6 +125,11 @@ public class TestZKReplicationQueueStorage { ZKUtil.createWithParents(zk, wal); } } + if (i % 2 == 0) { + // add a region_replica_replication znode + ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(rsZNode, + ZKReplicationQueueStorageForMigration.REGION_REPLICA_REPLICATION_PEER)); + } } ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(storage.getQueuesZNode(), deadServer.toString())); @@ -218,8 +223,20 @@ public class TestZKReplicationQueueStorage { Pair<ServerName, List<ZkReplicationQueueData>> pair = iter.next(); assertNotNull(pair); if (previousServerName != null) { - assertEquals(-1, ZKUtil.checkExists(zk, - ZNodePaths.joinZNode(storage.getQueuesZNode(), previousServerName.toString()))); + int index = previousServerName.equals(deadServer) + ? -1 + : Integer + .parseInt(Iterables.getLast(Splitter.on('-').split(previousServerName.getHostname()))); + if (index % 2 == 0) { + List<String> children = ZKUtil.listChildrenNoWatch(zk, + ZNodePaths.joinZNode(storage.getQueuesZNode(), previousServerName.toString())); + assertEquals(1, children.size()); + assertEquals(ZKReplicationQueueStorageForMigration.REGION_REPLICA_REPLICATION_PEER, + children.get(0)); + } else { + assertEquals(-1, ZKUtil.checkExists(zk, + ZNodePaths.joinZNode(storage.getQueuesZNode(), previousServerName.toString()))); + } } ServerName sn = pair.getFirst(); previousServerName = sn; @@ -258,7 +275,7 @@ public class TestZKReplicationQueueStorage { } } assertNull(iter.next()); - assertEquals(-1, ZKUtil.checkExists(zk, storage.getQueuesZNode())); + assertEquals(nServers / 2, ZKUtil.listChildrenNoWatch(zk, storage.getQueuesZNode()).size()); } @Test diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java index cff1b387936..f1b4cfd249f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.master.replication; +import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_CLEAN_UP; import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER; import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER; import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER; @@ -42,9 +43,11 @@ import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; @@ -52,6 +55,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableStateData; @@ -166,6 +170,20 @@ public class MigrateReplicationQueueFromZkToTableProcedure resetRetry(); } + private void cleanup(MasterProcedureEnv env) throws ProcedureSuspendedException { + ZKReplicationQueueStorageForMigration oldStorage = new ZKReplicationQueueStorageForMigration( + env.getMasterServices().getZooKeeper(), env.getMasterConfiguration()); + try { + oldStorage.deleteAllData(); + env.getReplicationPeerManager().deleteLegacyRegionReplicaReplicationPeer(); + } catch (KeeperException | ReplicationException e) { + throw suspend(env.getMasterConfiguration(), + backoff -> LOG.warn( + "failed to delete old replication queue data, sleep {} secs and retry later", + backoff / 1000, e)); + } + } + @Override protected Flow executeFromState(MasterProcedureEnv env, MigrateReplicationQueueFromZkToTableState state) @@ -179,23 +197,25 @@ public class MigrateReplicationQueueFromZkToTableProcedure waitUntilNoPeerProcedure(env); List<ReplicationPeerDescription> peers = env.getReplicationPeerManager().listPeers(null); if (peers.isEmpty()) { - LOG.info("No active replication peer found, delete old replication queue data and quit"); - ZKReplicationQueueStorageForMigration oldStorage = - new ZKReplicationQueueStorageForMigration(env.getMasterServices().getZooKeeper(), - env.getMasterConfiguration()); + // we will not load the region_replica_replication peer, so here we need to check the + // storage directly try { - oldStorage.deleteAllData(); - } catch (KeeperException e) { - throw suspend(env.getMasterConfiguration(), - backoff -> LOG.warn( - "failed to delete old replication queue data, sleep {} secs and retry later", - backoff / 1000, e)); + if (env.getReplicationPeerManager().hasRegionReplicaReplicationPeer()) { + LOG.info( + "No active replication peer found but we still have '{}' peer, need to" + + "wait until all region servers are upgraded", + ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER); + setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING); + return Flow.HAS_MORE_STATE; + } + } catch (ReplicationException e) { + throw suspend(env.getMasterConfiguration(), backoff -> LOG + .warn("failed to list peer ids, sleep {} secs and retry later", backoff / 1000, e)); } + LOG.info("No active replication peer found, just clean up all replication queue data"); setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER); return Flow.HAS_MORE_STATE; } - // here we do not care the peers which have already been disabled, as later we do not need - // to enable them disabledPeerIds = peers.stream().filter(ReplicationPeerDescription::isEnabled) .map(ReplicationPeerDescription::getPeerId).collect(Collectors.toList()); setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER); @@ -240,13 +260,21 @@ public class MigrateReplicationQueueFromZkToTableProcedure rsWithLowerVersion, MIN_MAJOR_VERSION, backoff / 1000)); } case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER: - for (String peerId : disabledPeerIds) { - addChildProcedure(new EnablePeerProcedure(peerId)); + if (CollectionUtils.isNotEmpty(disabledPeerIds)) { + for (String peerId : disabledPeerIds) { + addChildProcedure(new EnablePeerProcedure(peerId)); + } } setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER); return Flow.HAS_MORE_STATE; case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER: enableReplicationLogCleaner(env); + setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_CLEAN_UP); + return Flow.HAS_MORE_STATE; + case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_CLEAN_UP: + // this is mainly for deleting the region replica replication queue data, but anyway, since + // we should have migrated all data, here we can simply delete everything + cleanup(env); return Flow.NO_MORE_STATE; default: throw new UnsupportedOperationException("unhandled state=" + state); @@ -298,7 +326,7 @@ public class MigrateReplicationQueueFromZkToTableProcedure super.serializeStateData(serializer); MigrateReplicationQueueFromZkToTableStateData.Builder builder = MigrateReplicationQueueFromZkToTableStateData.newBuilder(); - if (disabledPeerIds != null) { + if (CollectionUtils.isNotEmpty(disabledPeerIds)) { builder.addAllDisabledPeerId(disabledPeerIds); } serializer.serialize(builder.build()); @@ -309,6 +337,8 @@ public class MigrateReplicationQueueFromZkToTableProcedure super.deserializeStateData(serializer); MigrateReplicationQueueFromZkToTableStateData data = serializer.deserialize(MigrateReplicationQueueFromZkToTableStateData.class); - disabledPeerIds = data.getDisabledPeerIdList().stream().collect(Collectors.toList()); + if (data.getDisabledPeerIdCount() > 0) { + disabledPeerIds = data.getDisabledPeerIdList().stream().collect(Collectors.toList()); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index ac9491834ae..63a2cd514df 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKConfig; @@ -688,26 +689,17 @@ public class ReplicationPeerManager implements ConfigurationObserver { ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME .equals(peerConfig.getReplicationEndpointImpl()) ) { - // we do not use this endpoint for region replication any more, see HBASE-26233 - LOG.info("Legacy region replication peer found, removing: {}", peerConfig); - // do it asynchronous to not block the start up of HMaster - new Thread("Remove legacy replication peer " + peerId) { - - @Override - public void run() { - try { - // need to delete two times to make sure we delete all the queues, see the comments in - // above - // removeAllQueues method for more details. - queueStorage.removeAllQueues(peerId); - queueStorage.removeAllQueues(peerId); - // delete queue first and then peer, because we use peer as a flag. - peerStorage.removePeer(peerId); - } catch (Exception e) { - LOG.warn("Failed to delete legacy replication peer {}", peerId); - } - } - }.start(); + // If memstore region replication is enabled, there will be a special replication peer + // usually called 'region_replica_replication'. We do not need to load it or migrate its + // replication queue data since we do not rely on general replication framework for + // region replication in 3.x now, please see HBASE-26233 for more details. + // We can not delete it now since region server with old version still want to update + // the replicated wal position to zk, if we delete the replication queue zk node, rs + // will crash. See HBASE-29169 for more details. + // In MigrateReplicationQueueFromZkToTableProcedure, finally we will call a deleteAllData on + // the old replication queue storage, to make sure that we will delete the the queue data + // for this peer and also the peer info in replication peer storage + LOG.info("Found old region replica replication peer '{}', skip loading it", peerId); continue; } peerConfig = ReplicationPeerConfigUtil.updateReplicationBasePeerConfigs(conf, peerConfig); @@ -816,6 +808,13 @@ public class ReplicationPeerManager implements ConfigurationObserver { return future; } + // this is for upgrading from 2.x to 3.x, in 3.x we will not load the 'region_replica_replication' + // peer, but we still need to know whether we have it on the old storage + boolean hasRegionReplicaReplicationPeer() throws ReplicationException { + return peerStorage.listPeerIds().stream() + .anyMatch(p -> p.equals(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER)); + } + /** * Submit the migration tasks to the given {@code executor}. */ @@ -833,4 +832,17 @@ public class ReplicationPeerManager implements ConfigurationObserver { runAsync(() -> migrateLastPushedSeqIds(oldStorage), executor), runAsync(() -> migrateHFileRefs(oldStorage), executor)); } + + void deleteLegacyRegionReplicaReplicationPeer() throws ReplicationException { + for (String peerId : peerStorage.listPeerIds()) { + ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); + if ( + ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME + .equals(peerConfig.getReplicationEndpointImpl()) + ) { + LOG.info("Delete old region replica replication peer '{}'", peerId); + peerStorage.removePeer(peerId); + } + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueue.java index 1b0f727a072..5404cfab05b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueue.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueue.java @@ -17,25 +17,35 @@ */ package org.apache.hadoop.hbase.master.replication; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.Collections; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationQueueData; +import org.apache.hadoop.hbase.replication.ReplicationQueueId; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.TestReplicationBase; import org.apache.hadoop.hbase.replication.ZKReplicationStorageBase; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZNodePaths; @@ -67,13 +77,27 @@ public class TestMigrateReplicationQueue extends TestReplicationBase { } private void mockData() throws Exception { + // fake a region_replica_replication peer and its queue data + ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() + .setClusterKey("127.0.0.1:2181:/hbase") + .setReplicationEndpointImpl(ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME).build(); + HMaster master = UTIL1.getMiniHBaseCluster().getMaster(); + master.getReplicationPeerManager() + .addPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER, peerConfig, true); + ServerName rsName = UTIL1.getMiniHBaseCluster().getRegionServer(0).getServerName(); + master.getReplicationPeerManager().getQueueStorage().setOffset( + new ReplicationQueueId(rsName, ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER), "", + new ReplicationGroupOffset("test-wal-file", 0), Collections.emptyMap()); + // delete the replication queue table to simulate upgrading from an older version of hbase TableName replicationQueueTableName = TableName .valueOf(UTIL1.getConfiguration().get(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString())); - List<ReplicationQueueData> queueDatas = UTIL1.getMiniHBaseCluster().getMaster() - .getReplicationPeerManager().getQueueStorage().listAllQueues(); - assertEquals(UTIL1.getMiniHBaseCluster().getRegionServerThreads().size(), queueDatas.size()); + List<ReplicationQueueData> queueDatas = + master.getReplicationPeerManager().getQueueStorage().listAllQueues(); + // have an extra mocked queue data for region_replica_replication peer + assertEquals(UTIL1.getMiniHBaseCluster().getRegionServerThreads().size() + 1, + queueDatas.size()); UTIL1.getAdmin().disableTable(replicationQueueTableName); UTIL1.getAdmin().deleteTable(replicationQueueTableName); // shutdown the hbase cluster @@ -106,9 +130,22 @@ public class TestMigrateReplicationQueue extends TestReplicationBase { assertTrue(UTIL1.getAdmin().tableExists(replicationQueueTableName)); ZKWatcher zk = UTIL1.getZooKeeperWatcher(); assertEquals(-1, ZKUtil.checkExists(zk, getQueuesZNode())); + // wait until MigrateReplicationQueueFromZkToTableProcedure finishes + UTIL1.waitFor(15000, () -> UTIL1.getMiniHBaseCluster().getMaster().getProcedures().stream() + .anyMatch(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure)); + UTIL1.waitFor(60000, + () -> UTIL1.getMiniHBaseCluster().getMaster().getProcedures().stream() + .filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure) + .allMatch(Procedure::isSuccess)); + // make sure the region_replica_replication peer is gone, and there is no data on zk + assertThat(UTIL1.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getPeerStorage() + .listPeerIds(), not(contains(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER))); + assertEquals(-1, ZKUtil.checkExists(zk, getQueuesZNode())); + // wait until SCP finishes, which means we can finish the claim queue operation UTIL1.waitFor(60000, () -> UTIL1.getMiniHBaseCluster().getMaster().getProcedures().stream() .filter(p -> p instanceof ServerCrashProcedure).allMatch(Procedure::isSuccess)); + List<ReplicationQueueData> queueDatas = UTIL1.getMiniHBaseCluster().getMaster() .getReplicationPeerManager().getQueueStorage().listAllQueues(); assertEquals(1, queueDatas.size()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestStartupWithLegacyRegionReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestStartupWithLegacyRegionReplicationEndpoint.java index 5af9edb8efc..ef04b97aada 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestStartupWithLegacyRegionReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestStartupWithLegacyRegionReplicationEndpoint.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SingleProcessHBaseCluster; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; -import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; @@ -108,9 +107,6 @@ public class TestStartupWithLegacyRegionReplicationEndpoint { () -> UTIL.getMiniHBaseCluster().getMaster().getProcedures().stream() .filter(p -> p instanceof ServerCrashProcedure).map(p -> (ServerCrashProcedure) p) .allMatch(Procedure::isSuccess)); - // the deletion is async, so wait until they get deleted - ReplicationPeerManager ppm = UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager(); - UTIL.waitFor(15000, () -> !ppm.getPeerStorage().listPeerIds().contains(peerId) - && ppm.getQueueStorage().listAllQueueIds(peerId, rsName).isEmpty()); + // we will delete the legacy peer while migrating, so here we do not assert the replication data } }
