[hbase] 09/10: HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure
This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch HBASE-27109/table_based_rqs in repository https://gitbox.apache.org/repos/asf/hbase.git commit 2fefe53e047564036d9e2a158c9adb1b1a9e3fb8 Author: Duo Zhang AuthorDate: Tue Oct 18 16:46:03 2022 +0800 HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure Signed-off-by: Liangjun He --- .../hbase/procedure2/TimeoutExecutorThread.java| 10 +- ...rateReplicationQueueFromZkToTableProcedure.java | 131 ++--- .../master/replication/ReplicationPeerManager.java | 45 --- ...tReplicationPeerManagerMigrateQueuesFromZk.java | 9 +- 4 files changed, 125 insertions(+), 70 deletions(-) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java index 3b99781a558..c0287a99435 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java @@ -78,9 +78,13 @@ class TimeoutExecutorThread extends StoppableThread { } public void add(Procedure procedure) { -LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(), - procedure.getTimeoutTimestamp()); -queue.add(new DelayedProcedure<>(procedure)); +if (procedure.getTimeout() > 0) { + LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(), +procedure.getTimeoutTimestamp()); + queue.add(new DelayedProcedure<>(procedure)); +} else { + LOG.info("Got negative timeout {} for {}, skip adding", procedure.getTimeout(), procedure); +} } public boolean remove(Procedure procedure) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java index 536f232338e..93ff27db3f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java @@ -25,19 +25,25 @@ import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureP import java.io.IOException; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.function.LongConsumer; import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; +import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.hadoop.hbase.util.IdLock; +import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; @@ -65,18 +71,34 @@ public class MigrateReplicationQueueFromZkToTableProcedure private List disabledPeerIds; - private List> futures; + private CompletableFuture future; private ExecutorService executor; + private RetryCounter retryCounter; + @Override public String getGlobalId() { return getClass().getSimpleName(); } + private ProcedureSuspendedException suspend(Configuration conf, LongConsumer backoffConsumer) +throws ProcedureSuspendedException { +if (retryCounter == null) { + retryCounter = ProcedureUtil.createRetryCounter(conf); +} +long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); +backoffConsumer.accept(backoff); +throw suspend(Math.toIntExact(backoff), true); + } + + private void resetRetry() { +retryCounter = null; + } + private ExecutorService getExecutorService() { if (executor == null) { - executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder() + executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
[hbase] 09/10: HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure
This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch HBASE-27109/table_based_rqs in repository https://gitbox.apache.org/repos/asf/hbase.git commit 00a25cf332c9951457e19789f512ad3fe003cfdb Author: Duo Zhang AuthorDate: Tue Oct 18 16:46:03 2022 +0800 HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure Signed-off-by: Liangjun He --- .../hbase/procedure2/TimeoutExecutorThread.java| 10 +- ...rateReplicationQueueFromZkToTableProcedure.java | 131 ++--- .../master/replication/ReplicationPeerManager.java | 45 --- ...tReplicationPeerManagerMigrateQueuesFromZk.java | 9 +- 4 files changed, 125 insertions(+), 70 deletions(-) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java index 3b99781a558..c0287a99435 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java @@ -78,9 +78,13 @@ class TimeoutExecutorThread extends StoppableThread { } public void add(Procedure procedure) { -LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(), - procedure.getTimeoutTimestamp()); -queue.add(new DelayedProcedure<>(procedure)); +if (procedure.getTimeout() > 0) { + LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(), +procedure.getTimeoutTimestamp()); + queue.add(new DelayedProcedure<>(procedure)); +} else { + LOG.info("Got negative timeout {} for {}, skip adding", procedure.getTimeout(), procedure); +} } public boolean remove(Procedure procedure) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java index 536f232338e..93ff27db3f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java @@ -25,19 +25,25 @@ import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureP import java.io.IOException; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.function.LongConsumer; import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; +import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.hadoop.hbase.util.IdLock; +import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; @@ -65,18 +71,34 @@ public class MigrateReplicationQueueFromZkToTableProcedure private List disabledPeerIds; - private List> futures; + private CompletableFuture future; private ExecutorService executor; + private RetryCounter retryCounter; + @Override public String getGlobalId() { return getClass().getSimpleName(); } + private ProcedureSuspendedException suspend(Configuration conf, LongConsumer backoffConsumer) +throws ProcedureSuspendedException { +if (retryCounter == null) { + retryCounter = ProcedureUtil.createRetryCounter(conf); +} +long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); +backoffConsumer.accept(backoff); +throw suspend(Math.toIntExact(backoff), true); + } + + private void resetRetry() { +retryCounter = null; + } + private ExecutorService getExecutorService() { if (executor == null) { - executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder() + executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
[hbase] 09/10: HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure
This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch HBASE-27109/table_based_rqs in repository https://gitbox.apache.org/repos/asf/hbase.git commit 3c54d9c27a9a6e147580cbeab67186b18019785b Author: Duo Zhang AuthorDate: Tue Oct 18 16:46:03 2022 +0800 HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure Signed-off-by: Liangjun He --- .../hbase/procedure2/TimeoutExecutorThread.java| 10 +- ...rateReplicationQueueFromZkToTableProcedure.java | 131 ++--- .../master/replication/ReplicationPeerManager.java | 45 --- ...tReplicationPeerManagerMigrateQueuesFromZk.java | 9 +- 4 files changed, 125 insertions(+), 70 deletions(-) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java index 3b99781a558..c0287a99435 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java @@ -78,9 +78,13 @@ class TimeoutExecutorThread extends StoppableThread { } public void add(Procedure procedure) { -LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(), - procedure.getTimeoutTimestamp()); -queue.add(new DelayedProcedure<>(procedure)); +if (procedure.getTimeout() > 0) { + LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(), +procedure.getTimeoutTimestamp()); + queue.add(new DelayedProcedure<>(procedure)); +} else { + LOG.info("Got negative timeout {} for {}, skip adding", procedure.getTimeout(), procedure); +} } public boolean remove(Procedure procedure) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java index 536f232338e..93ff27db3f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java @@ -25,19 +25,25 @@ import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureP import java.io.IOException; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.function.LongConsumer; import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; +import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.hadoop.hbase.util.IdLock; +import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; @@ -65,18 +71,34 @@ public class MigrateReplicationQueueFromZkToTableProcedure private List disabledPeerIds; - private List> futures; + private CompletableFuture future; private ExecutorService executor; + private RetryCounter retryCounter; + @Override public String getGlobalId() { return getClass().getSimpleName(); } + private ProcedureSuspendedException suspend(Configuration conf, LongConsumer backoffConsumer) +throws ProcedureSuspendedException { +if (retryCounter == null) { + retryCounter = ProcedureUtil.createRetryCounter(conf); +} +long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); +backoffConsumer.accept(backoff); +throw suspend(Math.toIntExact(backoff), true); + } + + private void resetRetry() { +retryCounter = null; + } + private ExecutorService getExecutorService() { if (executor == null) { - executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder() + executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
[hbase] 09/10: HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure
This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch HBASE-27109/table_based_rqs in repository https://gitbox.apache.org/repos/asf/hbase.git commit 32fd20812ed1291b8e83bb5c3b3c9fe8358c4251 Author: Duo Zhang AuthorDate: Tue Oct 18 16:46:03 2022 +0800 HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure Signed-off-by: Liangjun He --- .../hbase/procedure2/TimeoutExecutorThread.java| 10 +- ...rateReplicationQueueFromZkToTableProcedure.java | 131 ++--- .../master/replication/ReplicationPeerManager.java | 45 --- ...tReplicationPeerManagerMigrateQueuesFromZk.java | 9 +- 4 files changed, 125 insertions(+), 70 deletions(-) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java index 3b99781a558..c0287a99435 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java @@ -78,9 +78,13 @@ class TimeoutExecutorThread extends StoppableThread { } public void add(Procedure procedure) { -LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(), - procedure.getTimeoutTimestamp()); -queue.add(new DelayedProcedure<>(procedure)); +if (procedure.getTimeout() > 0) { + LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(), +procedure.getTimeoutTimestamp()); + queue.add(new DelayedProcedure<>(procedure)); +} else { + LOG.info("Got negative timeout {} for {}, skip adding", procedure.getTimeout(), procedure); +} } public boolean remove(Procedure procedure) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java index 536f232338e..93ff27db3f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java @@ -25,19 +25,25 @@ import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureP import java.io.IOException; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.function.LongConsumer; import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; +import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.hadoop.hbase.util.IdLock; +import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; @@ -65,18 +71,34 @@ public class MigrateReplicationQueueFromZkToTableProcedure private List disabledPeerIds; - private List> futures; + private CompletableFuture future; private ExecutorService executor; + private RetryCounter retryCounter; + @Override public String getGlobalId() { return getClass().getSimpleName(); } + private ProcedureSuspendedException suspend(Configuration conf, LongConsumer backoffConsumer) +throws ProcedureSuspendedException { +if (retryCounter == null) { + retryCounter = ProcedureUtil.createRetryCounter(conf); +} +long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); +backoffConsumer.accept(backoff); +throw suspend(Math.toIntExact(backoff), true); + } + + private void resetRetry() { +retryCounter = null; + } + private ExecutorService getExecutorService() { if (executor == null) { - executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder() + executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
[hbase] 09/10: HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure
This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch HBASE-27109/table_based_rqs in repository https://gitbox.apache.org/repos/asf/hbase.git commit 6085f14d3238dd5935084aedeaef348cd06f4222 Author: Duo Zhang AuthorDate: Tue Oct 18 16:46:03 2022 +0800 HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure Signed-off-by: Liangjun He --- .../hbase/procedure2/TimeoutExecutorThread.java| 10 +- ...rateReplicationQueueFromZkToTableProcedure.java | 131 ++--- .../master/replication/ReplicationPeerManager.java | 45 --- ...tReplicationPeerManagerMigrateQueuesFromZk.java | 9 +- 4 files changed, 125 insertions(+), 70 deletions(-) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java index 3b99781a558..c0287a99435 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java @@ -78,9 +78,13 @@ class TimeoutExecutorThread extends StoppableThread { } public void add(Procedure procedure) { -LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(), - procedure.getTimeoutTimestamp()); -queue.add(new DelayedProcedure<>(procedure)); +if (procedure.getTimeout() > 0) { + LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(), +procedure.getTimeoutTimestamp()); + queue.add(new DelayedProcedure<>(procedure)); +} else { + LOG.info("Got negative timeout {} for {}, skip adding", procedure.getTimeout(), procedure); +} } public boolean remove(Procedure procedure) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java index 536f232338e..93ff27db3f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java @@ -25,19 +25,25 @@ import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureP import java.io.IOException; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.function.LongConsumer; import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; +import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.hadoop.hbase.util.IdLock; +import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; @@ -65,18 +71,34 @@ public class MigrateReplicationQueueFromZkToTableProcedure private List disabledPeerIds; - private List> futures; + private CompletableFuture future; private ExecutorService executor; + private RetryCounter retryCounter; + @Override public String getGlobalId() { return getClass().getSimpleName(); } + private ProcedureSuspendedException suspend(Configuration conf, LongConsumer backoffConsumer) +throws ProcedureSuspendedException { +if (retryCounter == null) { + retryCounter = ProcedureUtil.createRetryCounter(conf); +} +long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); +backoffConsumer.accept(backoff); +throw suspend(Math.toIntExact(backoff), true); + } + + private void resetRetry() { +retryCounter = null; + } + private ExecutorService getExecutorService() { if (executor == null) { - executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder() + executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
[hbase] 09/10: HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure
This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch HBASE-27109/table_based_rqs in repository https://gitbox.apache.org/repos/asf/hbase.git commit 0c27c60e12f377802dbad4b29ebde5a319a7fef8 Author: Duo Zhang AuthorDate: Tue Oct 18 16:46:03 2022 +0800 HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure Signed-off-by: Liangjun He --- .../hbase/procedure2/TimeoutExecutorThread.java| 10 +- ...rateReplicationQueueFromZkToTableProcedure.java | 131 ++--- .../master/replication/ReplicationPeerManager.java | 45 --- ...tReplicationPeerManagerMigrateQueuesFromZk.java | 9 +- 4 files changed, 125 insertions(+), 70 deletions(-) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java index 3b99781a558..c0287a99435 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java @@ -78,9 +78,13 @@ class TimeoutExecutorThread extends StoppableThread { } public void add(Procedure procedure) { -LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(), - procedure.getTimeoutTimestamp()); -queue.add(new DelayedProcedure<>(procedure)); +if (procedure.getTimeout() > 0) { + LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(), +procedure.getTimeoutTimestamp()); + queue.add(new DelayedProcedure<>(procedure)); +} else { + LOG.info("Got negative timeout {} for {}, skip adding", procedure.getTimeout(), procedure); +} } public boolean remove(Procedure procedure) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java index 536f232338e..93ff27db3f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java @@ -25,19 +25,25 @@ import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureP import java.io.IOException; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.function.LongConsumer; import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; +import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.hadoop.hbase.util.IdLock; +import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; @@ -65,18 +71,34 @@ public class MigrateReplicationQueueFromZkToTableProcedure private List disabledPeerIds; - private List> futures; + private CompletableFuture future; private ExecutorService executor; + private RetryCounter retryCounter; + @Override public String getGlobalId() { return getClass().getSimpleName(); } + private ProcedureSuspendedException suspend(Configuration conf, LongConsumer backoffConsumer) +throws ProcedureSuspendedException { +if (retryCounter == null) { + retryCounter = ProcedureUtil.createRetryCounter(conf); +} +long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); +backoffConsumer.accept(backoff); +throw suspend(Math.toIntExact(backoff), true); + } + + private void resetRetry() { +retryCounter = null; + } + private ExecutorService getExecutorService() { if (executor == null) { - executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder() + executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
[hbase] 09/10: HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure
This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch HBASE-27109/table_based_rqs in repository https://gitbox.apache.org/repos/asf/hbase.git commit f852b3f8cb25439cfe6330ff65f0764ef3b42c8c Author: Duo Zhang AuthorDate: Tue Oct 18 16:46:03 2022 +0800 HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure Signed-off-by: Liangjun He --- .../hbase/procedure2/TimeoutExecutorThread.java| 10 +- ...rateReplicationQueueFromZkToTableProcedure.java | 131 ++--- .../master/replication/ReplicationPeerManager.java | 45 --- ...tReplicationPeerManagerMigrateQueuesFromZk.java | 9 +- 4 files changed, 125 insertions(+), 70 deletions(-) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java index 3b99781a558..c0287a99435 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java @@ -78,9 +78,13 @@ class TimeoutExecutorThread extends StoppableThread { } public void add(Procedure procedure) { -LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(), - procedure.getTimeoutTimestamp()); -queue.add(new DelayedProcedure<>(procedure)); +if (procedure.getTimeout() > 0) { + LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(), +procedure.getTimeoutTimestamp()); + queue.add(new DelayedProcedure<>(procedure)); +} else { + LOG.info("Got negative timeout {} for {}, skip adding", procedure.getTimeout(), procedure); +} } public boolean remove(Procedure procedure) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java index 536f232338e..93ff27db3f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java @@ -25,19 +25,25 @@ import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureP import java.io.IOException; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.function.LongConsumer; import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; +import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.hadoop.hbase.util.IdLock; +import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; @@ -65,18 +71,34 @@ public class MigrateReplicationQueueFromZkToTableProcedure private List disabledPeerIds; - private List> futures; + private CompletableFuture future; private ExecutorService executor; + private RetryCounter retryCounter; + @Override public String getGlobalId() { return getClass().getSimpleName(); } + private ProcedureSuspendedException suspend(Configuration conf, LongConsumer backoffConsumer) +throws ProcedureSuspendedException { +if (retryCounter == null) { + retryCounter = ProcedureUtil.createRetryCounter(conf); +} +long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); +backoffConsumer.accept(backoff); +throw suspend(Math.toIntExact(backoff), true); + } + + private void resetRetry() { +retryCounter = null; + } + private ExecutorService getExecutorService() { if (executor == null) { - executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder() + executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
[hbase] 09/10: HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure
This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch HBASE-27109/table_based_rqs in repository https://gitbox.apache.org/repos/asf/hbase.git commit ac878a550d23e1e4210fac0a4a6cecc0e8a5cb1c Author: Duo Zhang AuthorDate: Tue Oct 18 16:46:03 2022 +0800 HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure Signed-off-by: Liangjun He --- .../hbase/procedure2/TimeoutExecutorThread.java| 10 +- ...rateReplicationQueueFromZkToTableProcedure.java | 131 ++--- .../master/replication/ReplicationPeerManager.java | 45 --- ...tReplicationPeerManagerMigrateQueuesFromZk.java | 9 +- 4 files changed, 125 insertions(+), 70 deletions(-) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java index 3b99781a558..c0287a99435 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java @@ -78,9 +78,13 @@ class TimeoutExecutorThread extends StoppableThread { } public void add(Procedure procedure) { -LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(), - procedure.getTimeoutTimestamp()); -queue.add(new DelayedProcedure<>(procedure)); +if (procedure.getTimeout() > 0) { + LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(), +procedure.getTimeoutTimestamp()); + queue.add(new DelayedProcedure<>(procedure)); +} else { + LOG.info("Got negative timeout {} for {}, skip adding", procedure.getTimeout(), procedure); +} } public boolean remove(Procedure procedure) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java index 536f232338e..93ff27db3f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java @@ -25,19 +25,25 @@ import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureP import java.io.IOException; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.function.LongConsumer; import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; +import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.hadoop.hbase.util.IdLock; +import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; @@ -65,18 +71,34 @@ public class MigrateReplicationQueueFromZkToTableProcedure private List disabledPeerIds; - private List> futures; + private CompletableFuture future; private ExecutorService executor; + private RetryCounter retryCounter; + @Override public String getGlobalId() { return getClass().getSimpleName(); } + private ProcedureSuspendedException suspend(Configuration conf, LongConsumer backoffConsumer) +throws ProcedureSuspendedException { +if (retryCounter == null) { + retryCounter = ProcedureUtil.createRetryCounter(conf); +} +long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); +backoffConsumer.accept(backoff); +throw suspend(Math.toIntExact(backoff), true); + } + + private void resetRetry() { +retryCounter = null; + } + private ExecutorService getExecutorService() { if (executor == null) { - executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder() + executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
[hbase] 09/10: HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure
This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch HBASE-27109/table_based_rqs in repository https://gitbox.apache.org/repos/asf/hbase.git commit 233b2e88518a44bd6bc4dd2ef407f1ae822e4a3c Author: Duo Zhang AuthorDate: Tue Oct 18 16:46:03 2022 +0800 HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure Signed-off-by: Liangjun He --- .../hbase/procedure2/TimeoutExecutorThread.java| 10 +- ...rateReplicationQueueFromZkToTableProcedure.java | 131 ++--- .../master/replication/ReplicationPeerManager.java | 45 --- ...tReplicationPeerManagerMigrateQueuesFromZk.java | 9 +- 4 files changed, 125 insertions(+), 70 deletions(-) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java index 3b99781a558..c0287a99435 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java @@ -78,9 +78,13 @@ class TimeoutExecutorThread extends StoppableThread { } public void add(Procedure procedure) { -LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(), - procedure.getTimeoutTimestamp()); -queue.add(new DelayedProcedure<>(procedure)); +if (procedure.getTimeout() > 0) { + LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(), +procedure.getTimeoutTimestamp()); + queue.add(new DelayedProcedure<>(procedure)); +} else { + LOG.info("Got negative timeout {} for {}, skip adding", procedure.getTimeout(), procedure); +} } public boolean remove(Procedure procedure) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java index 536f232338e..93ff27db3f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java @@ -25,19 +25,25 @@ import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureP import java.io.IOException; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.function.LongConsumer; import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; +import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.hadoop.hbase.util.IdLock; +import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; @@ -65,18 +71,34 @@ public class MigrateReplicationQueueFromZkToTableProcedure private List disabledPeerIds; - private List> futures; + private CompletableFuture future; private ExecutorService executor; + private RetryCounter retryCounter; + @Override public String getGlobalId() { return getClass().getSimpleName(); } + private ProcedureSuspendedException suspend(Configuration conf, LongConsumer backoffConsumer) +throws ProcedureSuspendedException { +if (retryCounter == null) { + retryCounter = ProcedureUtil.createRetryCounter(conf); +} +long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); +backoffConsumer.accept(backoff); +throw suspend(Math.toIntExact(backoff), true); + } + + private void resetRetry() { +retryCounter = null; + } + private ExecutorService getExecutorService() { if (executor == null) { - executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder() + executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
[hbase] 09/10: HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure
This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch HBASE-27109/table_based_rqs in repository https://gitbox.apache.org/repos/asf/hbase.git commit d218c775beb4538b7a7a4d64347b469f310903ed Author: Duo Zhang AuthorDate: Tue Oct 18 16:46:03 2022 +0800 HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure Signed-off-by: Liangjun He --- .../hbase/procedure2/TimeoutExecutorThread.java| 10 +- ...rateReplicationQueueFromZkToTableProcedure.java | 131 ++--- .../master/replication/ReplicationPeerManager.java | 45 --- ...tReplicationPeerManagerMigrateQueuesFromZk.java | 9 +- 4 files changed, 125 insertions(+), 70 deletions(-) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java index 3b99781a558..c0287a99435 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java @@ -78,9 +78,13 @@ class TimeoutExecutorThread extends StoppableThread { } public void add(Procedure procedure) { -LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(), - procedure.getTimeoutTimestamp()); -queue.add(new DelayedProcedure<>(procedure)); +if (procedure.getTimeout() > 0) { + LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(), +procedure.getTimeoutTimestamp()); + queue.add(new DelayedProcedure<>(procedure)); +} else { + LOG.info("Got negative timeout {} for {}, skip adding", procedure.getTimeout(), procedure); +} } public boolean remove(Procedure procedure) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java index 536f232338e..93ff27db3f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java @@ -25,19 +25,25 @@ import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureP import java.io.IOException; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.function.LongConsumer; import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; +import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.hadoop.hbase.util.IdLock; +import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; @@ -65,18 +71,34 @@ public class MigrateReplicationQueueFromZkToTableProcedure private List disabledPeerIds; - private List> futures; + private CompletableFuture future; private ExecutorService executor; + private RetryCounter retryCounter; + @Override public String getGlobalId() { return getClass().getSimpleName(); } + private ProcedureSuspendedException suspend(Configuration conf, LongConsumer backoffConsumer) +throws ProcedureSuspendedException { +if (retryCounter == null) { + retryCounter = ProcedureUtil.createRetryCounter(conf); +} +long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); +backoffConsumer.accept(backoff); +throw suspend(Math.toIntExact(backoff), true); + } + + private void resetRetry() { +retryCounter = null; + } + private ExecutorService getExecutorService() { if (executor == null) { - executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder() + executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()