This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 25fe3698758fd3c9d569ead774072ead0db05a19
Author: Duo Zhang <[email protected]>
AuthorDate: Thu Mar 13 22:02:59 2025 +0800

    HBASE-29169 Removing the legacy 'region_replica_replication' peer will 
crash region servers while rolling upgrading (#6772)
    
    Signed-off-by: Nihal Jain <[email protected]>
    (cherry picked from commit 46fcd3c1ec8c17826f8c36186de77d33f4609cab)
---
 .../protobuf/server/master/MasterProcedure.proto   |  1 +
 .../ZKReplicationQueueStorageForMigration.java     | 46 ++++++++++++++--
 .../replication/TestZKReplicationQueueStorage.java | 23 ++++++--
 ...rateReplicationQueueFromZkToTableProcedure.java | 62 ++++++++++++++++------
 .../master/replication/ReplicationPeerManager.java | 52 +++++++++++-------
 .../replication/TestMigrateReplicationQueue.java   | 43 +++++++++++++--
 ...StartupWithLegacyRegionReplicationEndpoint.java |  6 +--
 7 files changed, 183 insertions(+), 50 deletions(-)

diff --git 
a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto 
b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto
index 81d16b2861c..e52a46ab358 100644
--- 
a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto
+++ 
b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto
@@ -787,6 +787,7 @@ enum MigrateReplicationQueueFromZkToTableState {
   MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING = 5;
   MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER = 6;
   MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER = 7;
+  MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_CLEAN_UP = 8;
 }
 
 message MigrateReplicationQueueFromZkToTableStateData {
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorageForMigration.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorageForMigration.java
index 22cc1314522..c44d3d62ce4 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorageForMigration.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorageForMigration.java
@@ -32,8 +32,11 @@ import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
+import 
org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 
 /**
  * Just retain a small set of the methods for the old zookeeper based 
replication queue storage, for
@@ -42,6 +45,9 @@ import 
org.apache.hbase.thirdparty.com.google.common.base.Splitter;
 @InterfaceAudience.Private
 public class ZKReplicationQueueStorageForMigration extends 
ZKReplicationStorageBase {
 
+  private static final Logger LOG =
+    LoggerFactory.getLogger(ZKReplicationQueueStorageForMigration.class);
+
   public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY =
     "zookeeper.znode.replication.hfile.refs";
   public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = 
"hfile-refs";
@@ -123,6 +129,8 @@ public class ZKReplicationQueueStorageForMigration extends 
ZKReplicationStorageB
     return getFileNode(getQueueNode(serverName, queueId), fileName);
   }
 
+  static final String REGION_REPLICA_REPLICATION_PEER = 
"region_replica_replication";
+
   @SuppressWarnings("unchecked")
   public MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>> 
listAllQueues()
     throws KeeperException {
@@ -136,25 +144,57 @@ public class ZKReplicationQueueStorageForMigration 
extends ZKReplicationStorageB
 
       private ServerName previousServerName;
 
+      private boolean hasRegionReplicaReplicationQueue;
+
+      private void cleanupQueuesWithoutRegionReplicaReplication(ServerName 
serverName)
+        throws Exception {
+        String rsZNode = getRsNode(serverName);
+        List<String> queueIdList = ZKUtil.listChildrenNoWatch(zookeeper, 
rsZNode);
+        if (CollectionUtils.isEmpty(queueIdList)) {
+          return;
+        }
+        for (String queueId : queueIdList) {
+          ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
+          if (!queueInfo.getPeerId().equals(REGION_REPLICA_REPLICATION_PEER)) {
+            ZKUtil.deleteNodeRecursively(zookeeper, getQueueNode(serverName, 
queueId));
+          }
+        }
+      }
+
       @Override
       public Pair<ServerName, List<ZkReplicationQueueData>> next() throws 
Exception {
         if (previousServerName != null) {
-          ZKUtil.deleteNodeRecursively(zookeeper, 
getRsNode(previousServerName));
+          if (hasRegionReplicaReplicationQueue) {
+            // if there are region_replica_replication queues, we can not 
delete it, just delete
+            // other queues, see HBASE-29169.
+            cleanupQueuesWithoutRegionReplicaReplication(previousServerName);
+          } else {
+            ZKUtil.deleteNodeRecursively(zookeeper, 
getRsNode(previousServerName));
+          }
         }
         if (!iter.hasNext()) {
-          ZKUtil.deleteNodeRecursively(zookeeper, queuesZNode);
+          // If there are region_replica_replication queues then we can not 
delete the data right
+          // now, otherwise we may crash the old region servers, see 
HBASE-29169.
+          // The migration procedure has a special step to cleanup everything
           return null;
         }
+        hasRegionReplicaReplicationQueue = false;
         String replicator = iter.next();
         ServerName serverName = ServerName.parseServerName(replicator);
         previousServerName = serverName;
         List<String> queueIdList = ZKUtil.listChildrenNoWatch(zookeeper, 
getRsNode(serverName));
-        if (queueIdList == null || queueIdList.isEmpty()) {
+        if (CollectionUtils.isEmpty(queueIdList)) {
           return Pair.newPair(serverName, Collections.emptyList());
         }
         List<ZkReplicationQueueData> queueDataList = new 
ArrayList<>(queueIdList.size());
         for (String queueIdStr : queueIdList) {
           ReplicationQueueInfo queueInfo = new 
ReplicationQueueInfo(queueIdStr);
+          if (queueInfo.getPeerId().equals(REGION_REPLICA_REPLICATION_PEER)) {
+            // we do not need to migrate the data for this queue, skip
+            LOG.debug("Found region replica replication queue {}, skip", 
queueInfo);
+            hasRegionReplicaReplicationQueue = true;
+            continue;
+          }
           ReplicationQueueId queueId;
           if (queueInfo.getDeadRegionServers().isEmpty()) {
             queueId = new ReplicationQueueId(serverName, 
queueInfo.getPeerId());
diff --git 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
index e38b7b134e9..b6438667574 100644
--- 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
+++ 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
@@ -125,6 +125,11 @@ public class TestZKReplicationQueueStorage {
           ZKUtil.createWithParents(zk, wal);
         }
       }
+      if (i % 2 == 0) {
+        // add a region_replica_replication znode
+        ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(rsZNode,
+          
ZKReplicationQueueStorageForMigration.REGION_REPLICA_REPLICATION_PEER));
+      }
     }
     ZKUtil.createWithParents(zk,
       ZNodePaths.joinZNode(storage.getQueuesZNode(), deadServer.toString()));
@@ -218,8 +223,20 @@ public class TestZKReplicationQueueStorage {
       Pair<ServerName, List<ZkReplicationQueueData>> pair = iter.next();
       assertNotNull(pair);
       if (previousServerName != null) {
-        assertEquals(-1, ZKUtil.checkExists(zk,
-          ZNodePaths.joinZNode(storage.getQueuesZNode(), 
previousServerName.toString())));
+        int index = previousServerName.equals(deadServer)
+          ? -1
+          : Integer
+            
.parseInt(Iterables.getLast(Splitter.on('-').split(previousServerName.getHostname())));
+        if (index % 2 == 0) {
+          List<String> children = ZKUtil.listChildrenNoWatch(zk,
+            ZNodePaths.joinZNode(storage.getQueuesZNode(), 
previousServerName.toString()));
+          assertEquals(1, children.size());
+          
assertEquals(ZKReplicationQueueStorageForMigration.REGION_REPLICA_REPLICATION_PEER,
+            children.get(0));
+        } else {
+          assertEquals(-1, ZKUtil.checkExists(zk,
+            ZNodePaths.joinZNode(storage.getQueuesZNode(), 
previousServerName.toString())));
+        }
       }
       ServerName sn = pair.getFirst();
       previousServerName = sn;
@@ -258,7 +275,7 @@ public class TestZKReplicationQueueStorage {
       }
     }
     assertNull(iter.next());
-    assertEquals(-1, ZKUtil.checkExists(zk, storage.getQueuesZNode()));
+    assertEquals(nServers / 2, ZKUtil.listChildrenNoWatch(zk, 
storage.getQueuesZNode()).size());
   }
 
   @Test
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
index cff1b387936..f1b4cfd249f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.master.replication;
 
+import static 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_CLEAN_UP;
 import static 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER;
 import static 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER;
 import static 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER;
@@ -42,9 +43,11 @@ import 
org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
 import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
 import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
 import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
+import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import 
org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
 import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
@@ -52,6 +55,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import 
org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableStateData;
@@ -166,6 +170,20 @@ public class MigrateReplicationQueueFromZkToTableProcedure
     resetRetry();
   }
 
+  private void cleanup(MasterProcedureEnv env) throws 
ProcedureSuspendedException {
+    ZKReplicationQueueStorageForMigration oldStorage = new 
ZKReplicationQueueStorageForMigration(
+      env.getMasterServices().getZooKeeper(), env.getMasterConfiguration());
+    try {
+      oldStorage.deleteAllData();
+      
env.getReplicationPeerManager().deleteLegacyRegionReplicaReplicationPeer();
+    } catch (KeeperException | ReplicationException e) {
+      throw suspend(env.getMasterConfiguration(),
+        backoff -> LOG.warn(
+          "failed to delete old replication queue data, sleep {} secs and 
retry later",
+          backoff / 1000, e));
+    }
+  }
+
   @Override
   protected Flow executeFromState(MasterProcedureEnv env,
     MigrateReplicationQueueFromZkToTableState state)
@@ -179,23 +197,25 @@ public class MigrateReplicationQueueFromZkToTableProcedure
         waitUntilNoPeerProcedure(env);
         List<ReplicationPeerDescription> peers = 
env.getReplicationPeerManager().listPeers(null);
         if (peers.isEmpty()) {
-          LOG.info("No active replication peer found, delete old replication 
queue data and quit");
-          ZKReplicationQueueStorageForMigration oldStorage =
-            new 
ZKReplicationQueueStorageForMigration(env.getMasterServices().getZooKeeper(),
-              env.getMasterConfiguration());
+          // we will not load the region_replica_replication peer, so here we 
need to check the
+          // storage directly
           try {
-            oldStorage.deleteAllData();
-          } catch (KeeperException e) {
-            throw suspend(env.getMasterConfiguration(),
-              backoff -> LOG.warn(
-                "failed to delete old replication queue data, sleep {} secs 
and retry later",
-                backoff / 1000, e));
+            if 
(env.getReplicationPeerManager().hasRegionReplicaReplicationPeer()) {
+              LOG.info(
+                "No active replication peer found but we still have '{}' peer, 
need to"
+                  + "wait until all region servers are upgraded",
+                ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER);
+              
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING);
+              return Flow.HAS_MORE_STATE;
+            }
+          } catch (ReplicationException e) {
+            throw suspend(env.getMasterConfiguration(), backoff -> LOG
+              .warn("failed to list peer ids, sleep {} secs and retry later", 
backoff / 1000, e));
           }
+          LOG.info("No active replication peer found, just clean up all 
replication queue data");
           
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER);
           return Flow.HAS_MORE_STATE;
         }
-        // here we do not care the peers which have already been disabled, as 
later we do not need
-        // to enable them
         disabledPeerIds = 
peers.stream().filter(ReplicationPeerDescription::isEnabled)
           
.map(ReplicationPeerDescription::getPeerId).collect(Collectors.toList());
         setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER);
@@ -240,13 +260,21 @@ public class MigrateReplicationQueueFromZkToTableProcedure
               rsWithLowerVersion, MIN_MAJOR_VERSION, backoff / 1000));
         }
       case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER:
-        for (String peerId : disabledPeerIds) {
-          addChildProcedure(new EnablePeerProcedure(peerId));
+        if (CollectionUtils.isNotEmpty(disabledPeerIds)) {
+          for (String peerId : disabledPeerIds) {
+            addChildProcedure(new EnablePeerProcedure(peerId));
+          }
         }
         
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER);
         return Flow.HAS_MORE_STATE;
       case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER:
         enableReplicationLogCleaner(env);
+        setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_CLEAN_UP);
+        return Flow.HAS_MORE_STATE;
+      case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_CLEAN_UP:
+        // this is mainly for deleting the region replica replication queue 
data, but anyway, since
+        // we should have migrated all data, here we can simply delete 
everything
+        cleanup(env);
         return Flow.NO_MORE_STATE;
       default:
         throw new UnsupportedOperationException("unhandled state=" + state);
@@ -298,7 +326,7 @@ public class MigrateReplicationQueueFromZkToTableProcedure
     super.serializeStateData(serializer);
     MigrateReplicationQueueFromZkToTableStateData.Builder builder =
       MigrateReplicationQueueFromZkToTableStateData.newBuilder();
-    if (disabledPeerIds != null) {
+    if (CollectionUtils.isNotEmpty(disabledPeerIds)) {
       builder.addAllDisabledPeerId(disabledPeerIds);
     }
     serializer.serialize(builder.build());
@@ -309,6 +337,8 @@ public class MigrateReplicationQueueFromZkToTableProcedure
     super.deserializeStateData(serializer);
     MigrateReplicationQueueFromZkToTableStateData data =
       
serializer.deserialize(MigrateReplicationQueueFromZkToTableStateData.class);
-    disabledPeerIds = 
data.getDisabledPeerIdList().stream().collect(Collectors.toList());
+    if (data.getDisabledPeerIdCount() > 0) {
+      disabledPeerIds = 
data.getDisabledPeerIdList().stream().collect(Collectors.toList());
+    }
   }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index ac9491834ae..63a2cd514df 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -76,6 +76,7 @@ import 
org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration
 import 
org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData;
 import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
@@ -688,26 +689,17 @@ public class ReplicationPeerManager implements 
ConfigurationObserver {
         ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME
           .equals(peerConfig.getReplicationEndpointImpl())
       ) {
-        // we do not use this endpoint for region replication any more, see 
HBASE-26233
-        LOG.info("Legacy region replication peer found, removing: {}", 
peerConfig);
-        // do it asynchronous to not block the start up of HMaster
-        new Thread("Remove legacy replication peer " + peerId) {
-
-          @Override
-          public void run() {
-            try {
-              // need to delete two times to make sure we delete all the 
queues, see the comments in
-              // above
-              // removeAllQueues method for more details.
-              queueStorage.removeAllQueues(peerId);
-              queueStorage.removeAllQueues(peerId);
-              // delete queue first and then peer, because we use peer as a 
flag.
-              peerStorage.removePeer(peerId);
-            } catch (Exception e) {
-              LOG.warn("Failed to delete legacy replication peer {}", peerId);
-            }
-          }
-        }.start();
+        // If memstore region replication is enabled, there will be a special 
replication peer
+        // usually called 'region_replica_replication'. We do not need to load 
it or migrate its
+        // replication queue data since we do not rely on general replication 
framework for
+        // region replication in 3.x now, please see HBASE-26233 for more 
details.
+        // We can not delete it now since region server with old version still 
want to update
+        // the replicated wal position to zk, if we delete the replication 
queue zk node, rs
+        // will crash. See HBASE-29169 for more details.
+        // In MigrateReplicationQueueFromZkToTableProcedure, finally we will 
call a deleteAllData on
+        // the old replication queue storage, to make sure that we will delete 
the the queue data
+        // for this peer and also the peer info in replication peer storage
+        LOG.info("Found old region replica replication peer '{}', skip loading 
it", peerId);
         continue;
       }
       peerConfig = 
ReplicationPeerConfigUtil.updateReplicationBasePeerConfigs(conf, peerConfig);
@@ -816,6 +808,13 @@ public class ReplicationPeerManager implements 
ConfigurationObserver {
     return future;
   }
 
+  // this is for upgrading from 2.x to 3.x, in 3.x we will not load the 
'region_replica_replication'
+  // peer, but we still need to know whether we have it on the old storage
+  boolean hasRegionReplicaReplicationPeer() throws ReplicationException {
+    return peerStorage.listPeerIds().stream()
+      .anyMatch(p -> 
p.equals(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER));
+  }
+
   /**
    * Submit the migration tasks to the given {@code executor}.
    */
@@ -833,4 +832,17 @@ public class ReplicationPeerManager implements 
ConfigurationObserver {
       runAsync(() -> migrateLastPushedSeqIds(oldStorage), executor),
       runAsync(() -> migrateHFileRefs(oldStorage), executor));
   }
+
+  void deleteLegacyRegionReplicaReplicationPeer() throws ReplicationException {
+    for (String peerId : peerStorage.listPeerIds()) {
+      ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
+      if (
+        ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME
+          .equals(peerConfig.getReplicationEndpointImpl())
+      ) {
+        LOG.info("Delete old region replica replication peer '{}'", peerId);
+        peerStorage.removePeer(peerId);
+      }
+    }
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueue.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueue.java
index 1b0f727a072..5404cfab05b 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueue.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueue.java
@@ -17,25 +17,35 @@
  */
 package org.apache.hadoop.hbase.master.replication;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationQueueData;
+import org.apache.hadoop.hbase.replication.ReplicationQueueId;
 import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.TestReplicationBase;
 import org.apache.hadoop.hbase.replication.ZKReplicationStorageBase;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
@@ -67,13 +77,27 @@ public class TestMigrateReplicationQueue extends 
TestReplicationBase {
   }
 
   private void mockData() throws Exception {
+    // fake a region_replica_replication peer and its queue data
+    ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
+      .setClusterKey("127.0.0.1:2181:/hbase")
+      
.setReplicationEndpointImpl(ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME).build();
+    HMaster master = UTIL1.getMiniHBaseCluster().getMaster();
+    master.getReplicationPeerManager()
+      .addPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER, 
peerConfig, true);
+    ServerName rsName = 
UTIL1.getMiniHBaseCluster().getRegionServer(0).getServerName();
+    master.getReplicationPeerManager().getQueueStorage().setOffset(
+      new ReplicationQueueId(rsName, 
ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER), "",
+      new ReplicationGroupOffset("test-wal-file", 0), Collections.emptyMap());
+
     // delete the replication queue table to simulate upgrading from an older 
version of hbase
     TableName replicationQueueTableName = TableName
       
.valueOf(UTIL1.getConfiguration().get(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME,
         
ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString()));
-    List<ReplicationQueueData> queueDatas = 
UTIL1.getMiniHBaseCluster().getMaster()
-      .getReplicationPeerManager().getQueueStorage().listAllQueues();
-    assertEquals(UTIL1.getMiniHBaseCluster().getRegionServerThreads().size(), 
queueDatas.size());
+    List<ReplicationQueueData> queueDatas =
+      master.getReplicationPeerManager().getQueueStorage().listAllQueues();
+    // have an extra mocked queue data for region_replica_replication peer
+    assertEquals(UTIL1.getMiniHBaseCluster().getRegionServerThreads().size() + 
1,
+      queueDatas.size());
     UTIL1.getAdmin().disableTable(replicationQueueTableName);
     UTIL1.getAdmin().deleteTable(replicationQueueTableName);
     // shutdown the hbase cluster
@@ -106,9 +130,22 @@ public class TestMigrateReplicationQueue extends 
TestReplicationBase {
     assertTrue(UTIL1.getAdmin().tableExists(replicationQueueTableName));
     ZKWatcher zk = UTIL1.getZooKeeperWatcher();
     assertEquals(-1, ZKUtil.checkExists(zk, getQueuesZNode()));
+    // wait until MigrateReplicationQueueFromZkToTableProcedure finishes
+    UTIL1.waitFor(15000, () -> 
UTIL1.getMiniHBaseCluster().getMaster().getProcedures().stream()
+      .anyMatch(p -> p instanceof 
MigrateReplicationQueueFromZkToTableProcedure));
+    UTIL1.waitFor(60000,
+      () -> UTIL1.getMiniHBaseCluster().getMaster().getProcedures().stream()
+        .filter(p -> p instanceof 
MigrateReplicationQueueFromZkToTableProcedure)
+        .allMatch(Procedure::isSuccess));
+    // make sure the region_replica_replication peer is gone, and there is no 
data on zk
+    
assertThat(UTIL1.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getPeerStorage()
+      .listPeerIds(), 
not(contains(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER)));
+    assertEquals(-1, ZKUtil.checkExists(zk, getQueuesZNode()));
+
     // wait until SCP finishes, which means we can finish the claim queue 
operation
     UTIL1.waitFor(60000, () -> 
UTIL1.getMiniHBaseCluster().getMaster().getProcedures().stream()
       .filter(p -> p instanceof 
ServerCrashProcedure).allMatch(Procedure::isSuccess));
+
     List<ReplicationQueueData> queueDatas = 
UTIL1.getMiniHBaseCluster().getMaster()
       .getReplicationPeerManager().getQueueStorage().listAllQueues();
     assertEquals(1, queueDatas.size());
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestStartupWithLegacyRegionReplicationEndpoint.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestStartupWithLegacyRegionReplicationEndpoint.java
index 5af9edb8efc..ef04b97aada 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestStartupWithLegacyRegionReplicationEndpoint.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestStartupWithLegacyRegionReplicationEndpoint.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
-import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@@ -108,9 +107,6 @@ public class TestStartupWithLegacyRegionReplicationEndpoint 
{
       () -> UTIL.getMiniHBaseCluster().getMaster().getProcedures().stream()
         .filter(p -> p instanceof ServerCrashProcedure).map(p -> 
(ServerCrashProcedure) p)
         .allMatch(Procedure::isSuccess));
-    // the deletion is async, so wait until they get deleted
-    ReplicationPeerManager ppm = 
UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager();
-    UTIL.waitFor(15000, () -> 
!ppm.getPeerStorage().listPeerIds().contains(peerId)
-      && ppm.getQueueStorage().listAllQueueIds(peerId, rsName).isEmpty());
+    // we will delete the legacy peer while migrating, so here we do not 
assert the replication data
   }
 }

Reply via email to