[hbase] 09/10: HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure

2023-03-15 Thread zhangduo
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-27109/table_based_rqs
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 2fefe53e047564036d9e2a158c9adb1b1a9e3fb8
Author: Duo Zhang 
AuthorDate: Tue Oct 18 16:46:03 2022 +0800

HBASE-27429 Add exponential retry backoff support for 
MigrateReplicationQueueFromZkToTableProcedure

Signed-off-by: Liangjun He 
---
 .../hbase/procedure2/TimeoutExecutorThread.java|  10 +-
 ...rateReplicationQueueFromZkToTableProcedure.java | 131 ++---
 .../master/replication/ReplicationPeerManager.java |  45 ---
 ...tReplicationPeerManagerMigrateQueuesFromZk.java |   9 +-
 4 files changed, 125 insertions(+), 70 deletions(-)

diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
index 3b99781a558..c0287a99435 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
@@ -78,9 +78,13 @@ class TimeoutExecutorThread extends 
StoppableThread {
   }
 
   public void add(Procedure procedure) {
-LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, 
procedure.getTimeout(),
-  procedure.getTimeoutTimestamp());
-queue.add(new DelayedProcedure<>(procedure));
+if (procedure.getTimeout() > 0) {
+  LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, 
procedure.getTimeout(),
+procedure.getTimeoutTimestamp());
+  queue.add(new DelayedProcedure<>(procedure));
+} else {
+  LOG.info("Got negative timeout {} for {}, skip adding", 
procedure.getTimeout(), procedure);
+}
   }
 
   public boolean remove(Procedure procedure) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
index 536f232338e..93ff27db3f7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
@@ -25,19 +25,25 @@ import static 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureP
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.function.LongConsumer;
 import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
 import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
 import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import 
org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.IdLock;
+import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
@@ -65,18 +71,34 @@ public class MigrateReplicationQueueFromZkToTableProcedure
 
   private List disabledPeerIds;
 
-  private List> futures;
+  private CompletableFuture future;
 
   private ExecutorService executor;
 
+  private RetryCounter retryCounter;
+
   @Override
   public String getGlobalId() {
 return getClass().getSimpleName();
   }
 
+  private ProcedureSuspendedException suspend(Configuration conf, LongConsumer 
backoffConsumer)
+throws ProcedureSuspendedException {
+if (retryCounter == null) {
+  retryCounter = ProcedureUtil.createRetryCounter(conf);
+}
+long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
+backoffConsumer.accept(backoff);
+throw suspend(Math.toIntExact(backoff), true);
+  }
+
+  private void resetRetry() {
+retryCounter = null;
+  }
+
   private ExecutorService getExecutorService() {
 if (executor == null) {
-  executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder()
+  executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
 

[hbase] 09/10: HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure

2023-03-12 Thread zhangduo
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-27109/table_based_rqs
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 00a25cf332c9951457e19789f512ad3fe003cfdb
Author: Duo Zhang 
AuthorDate: Tue Oct 18 16:46:03 2022 +0800

HBASE-27429 Add exponential retry backoff support for 
MigrateReplicationQueueFromZkToTableProcedure

Signed-off-by: Liangjun He 
---
 .../hbase/procedure2/TimeoutExecutorThread.java|  10 +-
 ...rateReplicationQueueFromZkToTableProcedure.java | 131 ++---
 .../master/replication/ReplicationPeerManager.java |  45 ---
 ...tReplicationPeerManagerMigrateQueuesFromZk.java |   9 +-
 4 files changed, 125 insertions(+), 70 deletions(-)

diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
index 3b99781a558..c0287a99435 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
@@ -78,9 +78,13 @@ class TimeoutExecutorThread extends 
StoppableThread {
   }
 
   public void add(Procedure procedure) {
-LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, 
procedure.getTimeout(),
-  procedure.getTimeoutTimestamp());
-queue.add(new DelayedProcedure<>(procedure));
+if (procedure.getTimeout() > 0) {
+  LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, 
procedure.getTimeout(),
+procedure.getTimeoutTimestamp());
+  queue.add(new DelayedProcedure<>(procedure));
+} else {
+  LOG.info("Got negative timeout {} for {}, skip adding", 
procedure.getTimeout(), procedure);
+}
   }
 
   public boolean remove(Procedure procedure) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
index 536f232338e..93ff27db3f7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
@@ -25,19 +25,25 @@ import static 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureP
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.function.LongConsumer;
 import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
 import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
 import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import 
org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.IdLock;
+import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
@@ -65,18 +71,34 @@ public class MigrateReplicationQueueFromZkToTableProcedure
 
   private List disabledPeerIds;
 
-  private List> futures;
+  private CompletableFuture future;
 
   private ExecutorService executor;
 
+  private RetryCounter retryCounter;
+
   @Override
   public String getGlobalId() {
 return getClass().getSimpleName();
   }
 
+  private ProcedureSuspendedException suspend(Configuration conf, LongConsumer 
backoffConsumer)
+throws ProcedureSuspendedException {
+if (retryCounter == null) {
+  retryCounter = ProcedureUtil.createRetryCounter(conf);
+}
+long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
+backoffConsumer.accept(backoff);
+throw suspend(Math.toIntExact(backoff), true);
+  }
+
+  private void resetRetry() {
+retryCounter = null;
+  }
+
   private ExecutorService getExecutorService() {
 if (executor == null) {
-  executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder()
+  executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
 

[hbase] 09/10: HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure

2023-03-08 Thread zhangduo
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-27109/table_based_rqs
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 3c54d9c27a9a6e147580cbeab67186b18019785b
Author: Duo Zhang 
AuthorDate: Tue Oct 18 16:46:03 2022 +0800

HBASE-27429 Add exponential retry backoff support for 
MigrateReplicationQueueFromZkToTableProcedure

Signed-off-by: Liangjun He 
---
 .../hbase/procedure2/TimeoutExecutorThread.java|  10 +-
 ...rateReplicationQueueFromZkToTableProcedure.java | 131 ++---
 .../master/replication/ReplicationPeerManager.java |  45 ---
 ...tReplicationPeerManagerMigrateQueuesFromZk.java |   9 +-
 4 files changed, 125 insertions(+), 70 deletions(-)

diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
index 3b99781a558..c0287a99435 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
@@ -78,9 +78,13 @@ class TimeoutExecutorThread extends 
StoppableThread {
   }
 
   public void add(Procedure procedure) {
-LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, 
procedure.getTimeout(),
-  procedure.getTimeoutTimestamp());
-queue.add(new DelayedProcedure<>(procedure));
+if (procedure.getTimeout() > 0) {
+  LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, 
procedure.getTimeout(),
+procedure.getTimeoutTimestamp());
+  queue.add(new DelayedProcedure<>(procedure));
+} else {
+  LOG.info("Got negative timeout {} for {}, skip adding", 
procedure.getTimeout(), procedure);
+}
   }
 
   public boolean remove(Procedure procedure) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
index 536f232338e..93ff27db3f7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
@@ -25,19 +25,25 @@ import static 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureP
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.function.LongConsumer;
 import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
 import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
 import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import 
org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.IdLock;
+import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
@@ -65,18 +71,34 @@ public class MigrateReplicationQueueFromZkToTableProcedure
 
   private List disabledPeerIds;
 
-  private List> futures;
+  private CompletableFuture future;
 
   private ExecutorService executor;
 
+  private RetryCounter retryCounter;
+
   @Override
   public String getGlobalId() {
 return getClass().getSimpleName();
   }
 
+  private ProcedureSuspendedException suspend(Configuration conf, LongConsumer 
backoffConsumer)
+throws ProcedureSuspendedException {
+if (retryCounter == null) {
+  retryCounter = ProcedureUtil.createRetryCounter(conf);
+}
+long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
+backoffConsumer.accept(backoff);
+throw suspend(Math.toIntExact(backoff), true);
+  }
+
+  private void resetRetry() {
+retryCounter = null;
+  }
+
   private ExecutorService getExecutorService() {
 if (executor == null) {
-  executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder()
+  executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
 

[hbase] 09/10: HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure

2023-03-06 Thread zhangduo
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-27109/table_based_rqs
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 32fd20812ed1291b8e83bb5c3b3c9fe8358c4251
Author: Duo Zhang 
AuthorDate: Tue Oct 18 16:46:03 2022 +0800

HBASE-27429 Add exponential retry backoff support for 
MigrateReplicationQueueFromZkToTableProcedure

Signed-off-by: Liangjun He 
---
 .../hbase/procedure2/TimeoutExecutorThread.java|  10 +-
 ...rateReplicationQueueFromZkToTableProcedure.java | 131 ++---
 .../master/replication/ReplicationPeerManager.java |  45 ---
 ...tReplicationPeerManagerMigrateQueuesFromZk.java |   9 +-
 4 files changed, 125 insertions(+), 70 deletions(-)

diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
index 3b99781a558..c0287a99435 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
@@ -78,9 +78,13 @@ class TimeoutExecutorThread extends 
StoppableThread {
   }
 
   public void add(Procedure procedure) {
-LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, 
procedure.getTimeout(),
-  procedure.getTimeoutTimestamp());
-queue.add(new DelayedProcedure<>(procedure));
+if (procedure.getTimeout() > 0) {
+  LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, 
procedure.getTimeout(),
+procedure.getTimeoutTimestamp());
+  queue.add(new DelayedProcedure<>(procedure));
+} else {
+  LOG.info("Got negative timeout {} for {}, skip adding", 
procedure.getTimeout(), procedure);
+}
   }
 
   public boolean remove(Procedure procedure) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
index 536f232338e..93ff27db3f7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
@@ -25,19 +25,25 @@ import static 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureP
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.function.LongConsumer;
 import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
 import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
 import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import 
org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.IdLock;
+import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
@@ -65,18 +71,34 @@ public class MigrateReplicationQueueFromZkToTableProcedure
 
   private List disabledPeerIds;
 
-  private List> futures;
+  private CompletableFuture future;
 
   private ExecutorService executor;
 
+  private RetryCounter retryCounter;
+
   @Override
   public String getGlobalId() {
 return getClass().getSimpleName();
   }
 
+  private ProcedureSuspendedException suspend(Configuration conf, LongConsumer 
backoffConsumer)
+throws ProcedureSuspendedException {
+if (retryCounter == null) {
+  retryCounter = ProcedureUtil.createRetryCounter(conf);
+}
+long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
+backoffConsumer.accept(backoff);
+throw suspend(Math.toIntExact(backoff), true);
+  }
+
+  private void resetRetry() {
+retryCounter = null;
+  }
+
   private ExecutorService getExecutorService() {
 if (executor == null) {
-  executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder()
+  executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
 

[hbase] 09/10: HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure

2023-02-02 Thread zhangduo
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-27109/table_based_rqs
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 6085f14d3238dd5935084aedeaef348cd06f4222
Author: Duo Zhang 
AuthorDate: Tue Oct 18 16:46:03 2022 +0800

HBASE-27429 Add exponential retry backoff support for 
MigrateReplicationQueueFromZkToTableProcedure

Signed-off-by: Liangjun He 
---
 .../hbase/procedure2/TimeoutExecutorThread.java|  10 +-
 ...rateReplicationQueueFromZkToTableProcedure.java | 131 ++---
 .../master/replication/ReplicationPeerManager.java |  45 ---
 ...tReplicationPeerManagerMigrateQueuesFromZk.java |   9 +-
 4 files changed, 125 insertions(+), 70 deletions(-)

diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
index 3b99781a558..c0287a99435 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
@@ -78,9 +78,13 @@ class TimeoutExecutorThread extends 
StoppableThread {
   }
 
   public void add(Procedure procedure) {
-LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, 
procedure.getTimeout(),
-  procedure.getTimeoutTimestamp());
-queue.add(new DelayedProcedure<>(procedure));
+if (procedure.getTimeout() > 0) {
+  LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, 
procedure.getTimeout(),
+procedure.getTimeoutTimestamp());
+  queue.add(new DelayedProcedure<>(procedure));
+} else {
+  LOG.info("Got negative timeout {} for {}, skip adding", 
procedure.getTimeout(), procedure);
+}
   }
 
   public boolean remove(Procedure procedure) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
index 536f232338e..93ff27db3f7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
@@ -25,19 +25,25 @@ import static 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureP
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.function.LongConsumer;
 import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
 import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
 import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import 
org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.IdLock;
+import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
@@ -65,18 +71,34 @@ public class MigrateReplicationQueueFromZkToTableProcedure
 
   private List disabledPeerIds;
 
-  private List> futures;
+  private CompletableFuture future;
 
   private ExecutorService executor;
 
+  private RetryCounter retryCounter;
+
   @Override
   public String getGlobalId() {
 return getClass().getSimpleName();
   }
 
+  private ProcedureSuspendedException suspend(Configuration conf, LongConsumer 
backoffConsumer)
+throws ProcedureSuspendedException {
+if (retryCounter == null) {
+  retryCounter = ProcedureUtil.createRetryCounter(conf);
+}
+long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
+backoffConsumer.accept(backoff);
+throw suspend(Math.toIntExact(backoff), true);
+  }
+
+  private void resetRetry() {
+retryCounter = null;
+  }
+
   private ExecutorService getExecutorService() {
 if (executor == null) {
-  executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder()
+  executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
 

[hbase] 09/10: HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure

2023-01-30 Thread zhangduo
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-27109/table_based_rqs
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 0c27c60e12f377802dbad4b29ebde5a319a7fef8
Author: Duo Zhang 
AuthorDate: Tue Oct 18 16:46:03 2022 +0800

HBASE-27429 Add exponential retry backoff support for 
MigrateReplicationQueueFromZkToTableProcedure

Signed-off-by: Liangjun He 
---
 .../hbase/procedure2/TimeoutExecutorThread.java|  10 +-
 ...rateReplicationQueueFromZkToTableProcedure.java | 131 ++---
 .../master/replication/ReplicationPeerManager.java |  45 ---
 ...tReplicationPeerManagerMigrateQueuesFromZk.java |   9 +-
 4 files changed, 125 insertions(+), 70 deletions(-)

diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
index 3b99781a558..c0287a99435 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
@@ -78,9 +78,13 @@ class TimeoutExecutorThread extends 
StoppableThread {
   }
 
   public void add(Procedure procedure) {
-LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, 
procedure.getTimeout(),
-  procedure.getTimeoutTimestamp());
-queue.add(new DelayedProcedure<>(procedure));
+if (procedure.getTimeout() > 0) {
+  LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, 
procedure.getTimeout(),
+procedure.getTimeoutTimestamp());
+  queue.add(new DelayedProcedure<>(procedure));
+} else {
+  LOG.info("Got negative timeout {} for {}, skip adding", 
procedure.getTimeout(), procedure);
+}
   }
 
   public boolean remove(Procedure procedure) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
index 536f232338e..93ff27db3f7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
@@ -25,19 +25,25 @@ import static 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureP
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.function.LongConsumer;
 import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
 import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
 import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import 
org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.IdLock;
+import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
@@ -65,18 +71,34 @@ public class MigrateReplicationQueueFromZkToTableProcedure
 
   private List disabledPeerIds;
 
-  private List> futures;
+  private CompletableFuture future;
 
   private ExecutorService executor;
 
+  private RetryCounter retryCounter;
+
   @Override
   public String getGlobalId() {
 return getClass().getSimpleName();
   }
 
+  private ProcedureSuspendedException suspend(Configuration conf, LongConsumer 
backoffConsumer)
+throws ProcedureSuspendedException {
+if (retryCounter == null) {
+  retryCounter = ProcedureUtil.createRetryCounter(conf);
+}
+long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
+backoffConsumer.accept(backoff);
+throw suspend(Math.toIntExact(backoff), true);
+  }
+
+  private void resetRetry() {
+retryCounter = null;
+  }
+
   private ExecutorService getExecutorService() {
 if (executor == null) {
-  executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder()
+  executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
 

[hbase] 09/10: HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure

2023-01-19 Thread zhangduo
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-27109/table_based_rqs
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit f852b3f8cb25439cfe6330ff65f0764ef3b42c8c
Author: Duo Zhang 
AuthorDate: Tue Oct 18 16:46:03 2022 +0800

HBASE-27429 Add exponential retry backoff support for 
MigrateReplicationQueueFromZkToTableProcedure

Signed-off-by: Liangjun He 
---
 .../hbase/procedure2/TimeoutExecutorThread.java|  10 +-
 ...rateReplicationQueueFromZkToTableProcedure.java | 131 ++---
 .../master/replication/ReplicationPeerManager.java |  45 ---
 ...tReplicationPeerManagerMigrateQueuesFromZk.java |   9 +-
 4 files changed, 125 insertions(+), 70 deletions(-)

diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
index 3b99781a558..c0287a99435 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
@@ -78,9 +78,13 @@ class TimeoutExecutorThread extends 
StoppableThread {
   }
 
   public void add(Procedure procedure) {
-LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, 
procedure.getTimeout(),
-  procedure.getTimeoutTimestamp());
-queue.add(new DelayedProcedure<>(procedure));
+if (procedure.getTimeout() > 0) {
+  LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, 
procedure.getTimeout(),
+procedure.getTimeoutTimestamp());
+  queue.add(new DelayedProcedure<>(procedure));
+} else {
+  LOG.info("Got negative timeout {} for {}, skip adding", 
procedure.getTimeout(), procedure);
+}
   }
 
   public boolean remove(Procedure procedure) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
index 536f232338e..93ff27db3f7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
@@ -25,19 +25,25 @@ import static 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureP
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.function.LongConsumer;
 import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
 import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
 import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import 
org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.IdLock;
+import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
@@ -65,18 +71,34 @@ public class MigrateReplicationQueueFromZkToTableProcedure
 
   private List disabledPeerIds;
 
-  private List> futures;
+  private CompletableFuture future;
 
   private ExecutorService executor;
 
+  private RetryCounter retryCounter;
+
   @Override
   public String getGlobalId() {
 return getClass().getSimpleName();
   }
 
+  private ProcedureSuspendedException suspend(Configuration conf, LongConsumer 
backoffConsumer)
+throws ProcedureSuspendedException {
+if (retryCounter == null) {
+  retryCounter = ProcedureUtil.createRetryCounter(conf);
+}
+long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
+backoffConsumer.accept(backoff);
+throw suspend(Math.toIntExact(backoff), true);
+  }
+
+  private void resetRetry() {
+retryCounter = null;
+  }
+
   private ExecutorService getExecutorService() {
 if (executor == null) {
-  executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder()
+  executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
 

[hbase] 09/10: HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure

2023-01-18 Thread zhangduo
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-27109/table_based_rqs
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit ac878a550d23e1e4210fac0a4a6cecc0e8a5cb1c
Author: Duo Zhang 
AuthorDate: Tue Oct 18 16:46:03 2022 +0800

HBASE-27429 Add exponential retry backoff support for 
MigrateReplicationQueueFromZkToTableProcedure

Signed-off-by: Liangjun He 
---
 .../hbase/procedure2/TimeoutExecutorThread.java|  10 +-
 ...rateReplicationQueueFromZkToTableProcedure.java | 131 ++---
 .../master/replication/ReplicationPeerManager.java |  45 ---
 ...tReplicationPeerManagerMigrateQueuesFromZk.java |   9 +-
 4 files changed, 125 insertions(+), 70 deletions(-)

diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
index 3b99781a558..c0287a99435 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
@@ -78,9 +78,13 @@ class TimeoutExecutorThread extends 
StoppableThread {
   }
 
   public void add(Procedure procedure) {
-LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, 
procedure.getTimeout(),
-  procedure.getTimeoutTimestamp());
-queue.add(new DelayedProcedure<>(procedure));
+if (procedure.getTimeout() > 0) {
+  LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, 
procedure.getTimeout(),
+procedure.getTimeoutTimestamp());
+  queue.add(new DelayedProcedure<>(procedure));
+} else {
+  LOG.info("Got negative timeout {} for {}, skip adding", 
procedure.getTimeout(), procedure);
+}
   }
 
   public boolean remove(Procedure procedure) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
index 536f232338e..93ff27db3f7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
@@ -25,19 +25,25 @@ import static 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureP
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.function.LongConsumer;
 import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
 import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
 import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import 
org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.IdLock;
+import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
@@ -65,18 +71,34 @@ public class MigrateReplicationQueueFromZkToTableProcedure
 
   private List disabledPeerIds;
 
-  private List> futures;
+  private CompletableFuture future;
 
   private ExecutorService executor;
 
+  private RetryCounter retryCounter;
+
   @Override
   public String getGlobalId() {
 return getClass().getSimpleName();
   }
 
+  private ProcedureSuspendedException suspend(Configuration conf, LongConsumer 
backoffConsumer)
+throws ProcedureSuspendedException {
+if (retryCounter == null) {
+  retryCounter = ProcedureUtil.createRetryCounter(conf);
+}
+long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
+backoffConsumer.accept(backoff);
+throw suspend(Math.toIntExact(backoff), true);
+  }
+
+  private void resetRetry() {
+retryCounter = null;
+  }
+
   private ExecutorService getExecutorService() {
 if (executor == null) {
-  executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder()
+  executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
 

[hbase] 09/10: HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure

2023-01-09 Thread zhangduo
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-27109/table_based_rqs
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 233b2e88518a44bd6bc4dd2ef407f1ae822e4a3c
Author: Duo Zhang 
AuthorDate: Tue Oct 18 16:46:03 2022 +0800

HBASE-27429 Add exponential retry backoff support for 
MigrateReplicationQueueFromZkToTableProcedure

Signed-off-by: Liangjun He 
---
 .../hbase/procedure2/TimeoutExecutorThread.java|  10 +-
 ...rateReplicationQueueFromZkToTableProcedure.java | 131 ++---
 .../master/replication/ReplicationPeerManager.java |  45 ---
 ...tReplicationPeerManagerMigrateQueuesFromZk.java |   9 +-
 4 files changed, 125 insertions(+), 70 deletions(-)

diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
index 3b99781a558..c0287a99435 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
@@ -78,9 +78,13 @@ class TimeoutExecutorThread extends 
StoppableThread {
   }
 
   public void add(Procedure procedure) {
-LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, 
procedure.getTimeout(),
-  procedure.getTimeoutTimestamp());
-queue.add(new DelayedProcedure<>(procedure));
+if (procedure.getTimeout() > 0) {
+  LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, 
procedure.getTimeout(),
+procedure.getTimeoutTimestamp());
+  queue.add(new DelayedProcedure<>(procedure));
+} else {
+  LOG.info("Got negative timeout {} for {}, skip adding", 
procedure.getTimeout(), procedure);
+}
   }
 
   public boolean remove(Procedure procedure) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
index 536f232338e..93ff27db3f7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
@@ -25,19 +25,25 @@ import static 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureP
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.function.LongConsumer;
 import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
 import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
 import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import 
org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.IdLock;
+import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
@@ -65,18 +71,34 @@ public class MigrateReplicationQueueFromZkToTableProcedure
 
   private List disabledPeerIds;
 
-  private List> futures;
+  private CompletableFuture future;
 
   private ExecutorService executor;
 
+  private RetryCounter retryCounter;
+
   @Override
   public String getGlobalId() {
 return getClass().getSimpleName();
   }
 
+  private ProcedureSuspendedException suspend(Configuration conf, LongConsumer 
backoffConsumer)
+throws ProcedureSuspendedException {
+if (retryCounter == null) {
+  retryCounter = ProcedureUtil.createRetryCounter(conf);
+}
+long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
+backoffConsumer.accept(backoff);
+throw suspend(Math.toIntExact(backoff), true);
+  }
+
+  private void resetRetry() {
+retryCounter = null;
+  }
+
   private ExecutorService getExecutorService() {
 if (executor == null) {
-  executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder()
+  executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
 

[hbase] 09/10: HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure

2022-12-03 Thread zhangduo
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-27109/table_based_rqs
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit d218c775beb4538b7a7a4d64347b469f310903ed
Author: Duo Zhang 
AuthorDate: Tue Oct 18 16:46:03 2022 +0800

HBASE-27429 Add exponential retry backoff support for 
MigrateReplicationQueueFromZkToTableProcedure

Signed-off-by: Liangjun He 
---
 .../hbase/procedure2/TimeoutExecutorThread.java|  10 +-
 ...rateReplicationQueueFromZkToTableProcedure.java | 131 ++---
 .../master/replication/ReplicationPeerManager.java |  45 ---
 ...tReplicationPeerManagerMigrateQueuesFromZk.java |   9 +-
 4 files changed, 125 insertions(+), 70 deletions(-)

diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
index 3b99781a558..c0287a99435 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
@@ -78,9 +78,13 @@ class TimeoutExecutorThread extends 
StoppableThread {
   }
 
   public void add(Procedure procedure) {
-LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, 
procedure.getTimeout(),
-  procedure.getTimeoutTimestamp());
-queue.add(new DelayedProcedure<>(procedure));
+if (procedure.getTimeout() > 0) {
+  LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, 
procedure.getTimeout(),
+procedure.getTimeoutTimestamp());
+  queue.add(new DelayedProcedure<>(procedure));
+} else {
+  LOG.info("Got negative timeout {} for {}, skip adding", 
procedure.getTimeout(), procedure);
+}
   }
 
   public boolean remove(Procedure procedure) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
index 536f232338e..93ff27db3f7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
@@ -25,19 +25,25 @@ import static 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureP
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.function.LongConsumer;
 import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
 import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
 import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import 
org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.IdLock;
+import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
@@ -65,18 +71,34 @@ public class MigrateReplicationQueueFromZkToTableProcedure
 
   private List disabledPeerIds;
 
-  private List> futures;
+  private CompletableFuture future;
 
   private ExecutorService executor;
 
+  private RetryCounter retryCounter;
+
   @Override
   public String getGlobalId() {
 return getClass().getSimpleName();
   }
 
+  private ProcedureSuspendedException suspend(Configuration conf, LongConsumer 
backoffConsumer)
+throws ProcedureSuspendedException {
+if (retryCounter == null) {
+  retryCounter = ProcedureUtil.createRetryCounter(conf);
+}
+long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
+backoffConsumer.accept(backoff);
+throw suspend(Math.toIntExact(backoff), true);
+  }
+
+  private void resetRetry() {
+retryCounter = null;
+  }
+
   private ExecutorService getExecutorService() {
 if (executor == null) {
-  executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder()
+  executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()