This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-27109/table_based_rqs
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit f55419a04b73a72ffb3d22d9e982865a6e5fde43
Author: Duo Zhang <zhang...@apache.org>
AuthorDate: Sat Dec 3 20:51:40 2022 +0800

    HBASE-27430 Should disable replication log cleaner when migrating 
replication queue data (#4901)
    
    Signed-off-by: Liangjun He <heliang...@apache.org>
---
 .../protobuf/server/master/MasterProcedure.proto   | 12 +++---
 ...rateReplicationQueueFromZkToTableProcedure.java | 47 +++++++++++++++++++++-
 ...rateReplicationQueueFromZkToTableProcedure.java | 29 ++++++++++++-
 3 files changed, 80 insertions(+), 8 deletions(-)

diff --git 
a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto 
b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto
index b6f5d7e50bb..14d07c17c88 100644
--- 
a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto
+++ 
b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto
@@ -724,11 +724,13 @@ message AssignReplicationQueuesStateData {
 }
 
 enum MigrateReplicationQueueFromZkToTableState {
-  MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE = 1;
-  MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER = 2;
-  MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE = 3;
-  MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING = 4;
-  MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER = 5;
+  MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER = 1;
+  MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE = 2;
+  MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER = 3;
+  MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE = 4;
+  MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING = 5;
+  MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER = 6;
+  MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER = 7;
 }
 
 message MigrateReplicationQueueFromZkToTableStateData {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
index 93ff27db3f7..b7c4e33ef85 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
@@ -17,7 +17,9 @@
  */
 package org.apache.hadoop.hbase.master.replication;
 
+import static 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER;
 import static 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER;
+import static 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER;
 import static 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER;
 import static 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE;
 import static 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE;
@@ -111,6 +113,26 @@ public class MigrateReplicationQueueFromZkToTableProcedure
     }
   }
 
+  private void disableReplicationLogCleaner(MasterProcedureEnv env)
+    throws ProcedureSuspendedException {
+    if 
(!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
+      // it is not likely that we can reach here as we will schedule this 
procedure immediately
+      // after master restarting, where ReplicationLogCleaner should have not 
started its first run
+      // yet. But anyway, let's make the code more robust. And it is safe to 
wait a bit here since
+      // there will be no data in the new replication queue storage before we 
execute this procedure
+      // so ReplicationLogCleaner will quit immediately without doing anything.
+      throw suspend(env.getMasterConfiguration(),
+        backoff -> LOG.info(
+          "Can not disable replication log cleaner, sleep {} secs and retry 
later",
+          backoff / 1000));
+    }
+    resetRetry();
+  }
+
+  private void enableReplicationLogCleaner(MasterProcedureEnv env) {
+    env.getReplicationPeerManager().getReplicationLogCleanerBarrier().enable();
+  }
+
   private void waitUntilNoPeerProcedure(MasterProcedureEnv env) throws 
ProcedureSuspendedException {
     long peerProcCount;
     try {
@@ -136,6 +158,10 @@ public class MigrateReplicationQueueFromZkToTableProcedure
     MigrateReplicationQueueFromZkToTableState state)
     throws ProcedureSuspendedException, ProcedureYieldException, 
InterruptedException {
     switch (state) {
+      case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER:
+        disableReplicationLogCleaner(env);
+        setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE);
+        return Flow.HAS_MORE_STATE;
       case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE:
         waitUntilNoPeerProcedure(env);
         List<ReplicationPeerDescription> peers = 
env.getReplicationPeerManager().listPeers(null);
@@ -152,7 +178,8 @@ public class MigrateReplicationQueueFromZkToTableProcedure
                 "failed to delete old replication queue data, sleep {} secs 
and retry later",
                 backoff / 1000, e));
           }
-          return Flow.NO_MORE_STATE;
+          
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER);
+          return Flow.HAS_MORE_STATE;
         }
         // here we do not care the peers which have already been disabled, as 
later we do not need
         // to enable them
@@ -232,6 +259,10 @@ public class MigrateReplicationQueueFromZkToTableProcedure
         for (String peerId : disabledPeerIds) {
           addChildProcedure(new EnablePeerProcedure(peerId));
         }
+        
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER);
+        return Flow.HAS_MORE_STATE;
+      case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER:
+        enableReplicationLogCleaner(env);
         return Flow.NO_MORE_STATE;
       default:
         throw new UnsupportedOperationException("unhandled state=" + state);
@@ -263,7 +294,19 @@ public class MigrateReplicationQueueFromZkToTableProcedure
 
   @Override
   protected MigrateReplicationQueueFromZkToTableState getInitialState() {
-    return MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE;
+    return MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER;
+  }
+
+  @Override
+  protected void afterReplay(MasterProcedureEnv env) {
+    if (getCurrentState() == getInitialState()) {
+      // do not need to disable log cleaner or acquire lock if we are in the 
initial state, later
+      // when executing the procedure we will try to disable and acquire.
+      return;
+    }
+    if 
(!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
+      throw new IllegalStateException("can not disable log cleaner, this 
should not happen");
+    }
   }
 
   @Override
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedure.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedure.java
index 752abc380b8..cb795edcd62 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedure.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedure.java
@@ -17,8 +17,11 @@
  */
 package org.apache.hadoop.hbase.master.replication;
 
+import static 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER;
 import static 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -48,6 +51,7 @@ import 
org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
 import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -102,6 +106,8 @@ public class 
TestMigrateReplicationQueueFromZkToTableProcedure {
 
   @BeforeClass
   public static void setupCluster() throws Exception {
+    // one hour, to make sure it will not run during the test
+    UTIL.getConfiguration().setInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL, 60 * 
60 * 1000);
     UTIL.startMiniCluster(
       
StartTestingClusterOption.builder().masterClass(HMasterForTest.class).build());
   }
@@ -193,8 +199,10 @@ public class 
TestMigrateReplicationQueueFromZkToTableProcedure {
     UTIL.waitFor(30000, () -> proc.isSuccess());
   }
 
+  // make sure we will disable replication peers while migrating
+  // and also tests disable/enable replication log cleaner and wait for region 
server upgrading
   @Test
-  public void testDisablePeerAndWaitUpgrading() throws Exception {
+  public void testDisablePeerAndWaitStates() throws Exception {
     String peerId = "2";
     ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
       .setClusterKey(UTIL.getZkCluster().getAddress().toString() + 
":/testhbase")
@@ -206,11 +214,22 @@ public class 
TestMigrateReplicationQueueFromZkToTableProcedure {
     EXTRA_REGION_SERVERS
       .put(ServerName.valueOf("localhost", 54321, 
EnvironmentEdgeManager.currentTime()), metrics);
 
+    ReplicationLogCleanerBarrier barrier = UTIL.getHBaseCluster().getMaster()
+      .getReplicationPeerManager().getReplicationLogCleanerBarrier();
+    assertTrue(barrier.start());
+
     ProcedureExecutor<MasterProcedureEnv> procExec = 
getMasterProcedureExecutor();
 
     MigrateReplicationQueueFromZkToTableProcedure proc =
       new MigrateReplicationQueueFromZkToTableProcedure();
     procExec.submitProcedure(proc);
+
+    Thread.sleep(5000);
+    // make sure we are still waiting for replication log cleaner quit
+    
assertEquals(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER.getNumber(),
+      proc.getCurrentStateId());
+    barrier.stop();
+
     // wait until we reach the wait upgrading state
     UTIL.waitFor(30000,
       () -> proc.getCurrentStateId()
@@ -218,9 +237,17 @@ public class 
TestMigrateReplicationQueueFromZkToTableProcedure {
         && proc.getState() == ProcedureState.WAITING_TIMEOUT);
     // make sure the peer is disabled for migrating
     assertFalse(UTIL.getAdmin().isReplicationPeerEnabled(peerId));
+    // make sure the replication log cleaner is disabled
+    assertFalse(barrier.start());
 
     // the procedure should finish successfully
     EXTRA_REGION_SERVERS.clear();
     UTIL.waitFor(30000, () -> proc.isSuccess());
+
+    // make sure the peer is enabled again
+    assertTrue(UTIL.getAdmin().isReplicationPeerEnabled(peerId));
+    // make sure the replication log cleaner is enabled again
+    assertTrue(barrier.start());
+    barrier.stop();
   }
 }

Reply via email to