This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new b0542b3312e [improve] Retry re-validating ResourceLock with backoff 
after errors (#22617)
b0542b3312e is described below

commit b0542b3312ef915e7467d9550d19a3e7ca6aca99
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Tue May 7 10:35:23 2024 -0700

    [improve] Retry re-validating ResourceLock with backoff after errors 
(#22617)
---
 .../pulsar/broker/service/AbstractReplicator.java  |  2 +-
 .../service/PulsarMetadataEventSynchronizer.java   |  2 +-
 .../broker/service/TopicPoliciesService.java       |  4 +--
 .../PersistentDispatcherMultipleConsumers.java     |  2 +-
 .../PersistentDispatcherSingleActiveConsumer.java  |  2 +-
 .../service/persistent/PersistentReplicator.java   |  2 +-
 .../streamingdispatch/StreamingEntryReader.java    |  2 +-
 .../pendingack/impl/PendingAckHandleImpl.java      |  2 +-
 .../common/naming/NamespaceBundleFactory.java      |  2 +-
 .../SystemTopicBasedTopicPoliciesServiceTest.java  |  4 +--
 .../pulsar/client/impl/ConnectionHandlerTest.java  |  2 ++
 .../apache/pulsar/client/impl/RetryUtilTest.java   |  2 ++
 .../client/impl/BinaryProtoLookupService.java      |  2 ++
 .../pulsar/client/impl/ConnectionHandler.java      |  1 +
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  2 ++
 .../impl/PatternMultiTopicsConsumerImpl.java       |  2 ++
 .../apache/pulsar/client/impl/ProducerImpl.java    |  1 +
 .../pulsar/client/impl/PulsarClientImpl.java       |  2 ++
 .../pulsar/client/impl/TopicListWatcher.java       |  1 +
 .../client/impl/TransactionMetaStoreHandler.java   |  2 ++
 .../org/apache/pulsar/client/util/RetryUtil.java   |  2 +-
 .../pulsar/client/impl/ConsumerImplTest.java       |  1 +
 .../org/apache/pulsar/common/util}/Backoff.java    |  2 +-
 .../apache/pulsar/common/util}/BackoffBuilder.java |  5 ++-
 .../apache/pulsar/common/util}/BackoffTest.java    |  2 +-
 .../coordination/impl/LockManagerImpl.java         | 10 +++---
 .../coordination/impl/ResourceLockImpl.java        | 37 +++++++++++++++++++---
 .../apache/pulsar/metadata/LockManagerTest.java    | 31 ++++++++++++++++++
 28 files changed, 106 insertions(+), 25 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index 902420e77b9..e9911a3c5be 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -36,10 +36,10 @@ import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.Backoff;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
index 80743e44ab7..0383a0b7552 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
@@ -33,8 +33,8 @@ import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.util.Backoff;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataEvent;
 import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index c09bab0a4b6..cfac6b396e8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -24,13 +24,13 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
-import org.apache.pulsar.client.impl.Backoff;
-import org.apache.pulsar.client.impl.BackoffBuilder;
 import org.apache.pulsar.client.util.RetryUtil;
 import org.apache.pulsar.common.classification.InterfaceStability;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.apache.pulsar.common.util.Backoff;
+import org.apache.pulsar.common.util.BackoffBuilder;
 import org.apache.pulsar.common.util.FutureUtil;
 
 /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 9d0dba798ad..2f99d57aa45 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -68,11 +68,11 @@ import 
org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
 import 
org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
-import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.Backoff;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index cc7b6841e5c..2830e700a62 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -50,8 +50,8 @@ import org.apache.pulsar.broker.service.SendMessageInfo;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
 import 
org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
-import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
+import org.apache.pulsar.common.util.Backoff;
 import org.apache.pulsar.common.util.Codec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 7ee1619e704..f5d0cc8aa9c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -55,7 +55,6 @@ import 
org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -64,6 +63,7 @@ import org.apache.pulsar.common.api.proto.MarkerType;
 import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.stats.Rate;
+import org.apache.pulsar.common.util.Backoff;
 import org.apache.pulsar.common.util.Codec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
index 6ffc5ba0f62..bed3bea236f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
@@ -37,7 +37,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import 
org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
-import org.apache.pulsar.client.impl.Backoff;
+import org.apache.pulsar.common.util.Backoff;
 
 /**
  * Entry reader that fulfill read request by streamline the read instead of 
reading with micro batch.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 5ed271c6fd4..9d07af4d26c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -59,11 +59,11 @@ import 
org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
 import 
org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
 import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
 import org.apache.pulsar.common.stats.PositionInPendingAckStats;
+import org.apache.pulsar.common.util.Backoff;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.RecoverTimeRecord;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
index 937d2763767..6f085cd5e2f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
@@ -50,10 +50,10 @@ import org.apache.pulsar.broker.loadbalance.LoadManager;
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
 import org.apache.pulsar.broker.resources.LocalPoliciesResources;
 import org.apache.pulsar.broker.resources.PulsarResources;
-import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.util.Backoff;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.policies.data.loadbalancer.BundleData;
 import org.apache.pulsar.stats.CacheMetricsCollector;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index e571da13435..f79c7412bc7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -45,8 +45,8 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCach
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.impl.Backoff;
-import org.apache.pulsar.client.impl.BackoffBuilder;
+import org.apache.pulsar.common.util.Backoff;
+import org.apache.pulsar.common.util.BackoffBuilder;
 import org.apache.pulsar.common.events.PulsarEvent;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
index f29d62db5f4..add0a5f3eef 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
@@ -32,6 +32,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.common.util.Backoff;
+import org.apache.pulsar.common.util.BackoffBuilder;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
 import org.awaitility.core.ConditionTimeoutException;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java
index 604c468b1de..58d1709cd1a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.client.impl;
 
 import org.apache.pulsar.client.util.RetryUtil;
+import org.apache.pulsar.common.util.Backoff;
+import org.apache.pulsar.common.util.BackoffBuilder;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.annotations.Test;
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 8ceb8e44975..9634c60f09a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -45,6 +45,8 @@ import 
org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.util.Backoff;
+import org.apache.pulsar.common.util.BackoffBuilder;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index 77a85c255a8..6403d48d7be 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -29,6 +29,7 @@ import lombok.Setter;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.HandlerState.State;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.Backoff;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 468b5f7a8b8..bebc5601c6a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -117,6 +117,8 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.util.Backoff;
+import org.apache.pulsar.common.util.BackoffBuilder;
 import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
 import org.apache.pulsar.common.util.ExceptionHandler;
 import org.apache.pulsar.common.util.FutureUtil;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index 4d179f7d914..ec7ff7930c0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -43,6 +43,8 @@ import org.apache.pulsar.common.lookup.GetTopicsResult;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.topics.TopicList;
+import org.apache.pulsar.common.util.Backoff;
+import org.apache.pulsar.common.util.BackoffBuilder;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index a41de6d1051..426ac5009b5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -91,6 +91,7 @@ import org.apache.pulsar.common.protocol.schema.SchemaHash;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.util.BackoffBuilder;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.RelativeTimeUtil;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index fdabb5fa8cf..b92e039e5fa 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -84,6 +84,8 @@ import 
org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.topics.TopicList;
+import org.apache.pulsar.common.util.Backoff;
+import org.apache.pulsar.common.util.BackoffBuilder;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.slf4j.Logger;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
index 86adf69f06e..4e635e0d2e8 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.BackoffBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index ebbfca0c3cb..c27fe958ddd 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -46,6 +46,8 @@ import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.api.proto.Subscription;
 import org.apache.pulsar.common.api.proto.TxnAction;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.Backoff;
+import org.apache.pulsar.common.util.BackoffBuilder;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java
index 93501d7b6c1..912cb7d7c58 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java
@@ -22,7 +22,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
-import org.apache.pulsar.client.impl.Backoff;
+import org.apache.pulsar.common.util.Backoff;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index 9995246c175..0c47d17098e 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -49,6 +49,7 @@ import 
org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.client.util.ScheduledExecutorProvider;
+import org.apache.pulsar.common.util.Backoff;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java
similarity index 99%
rename from 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java
rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java
index daaf3499400..4eab85f3c41 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.impl;
+package org.apache.pulsar.common.util;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.time.Clock;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/BackoffBuilder.java
similarity index 91%
rename from 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java
rename to 
pulsar-common/src/main/java/org/apache/pulsar/common/util/BackoffBuilder.java
index 9913393fa9a..69b39030081 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/BackoffBuilder.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.impl;
+package org.apache.pulsar.common.util;
 
 import java.time.Clock;
 import java.util.concurrent.TimeUnit;
@@ -32,8 +32,11 @@ public class BackoffBuilder {
 
     public BackoffBuilder() {
         this.initial = 0;
+        this.unitInitial = TimeUnit.MILLISECONDS;
         this.max = 0;
+        this.unitMax = TimeUnit.MILLISECONDS;
         this.mandatoryStop = 0;
+        this.unitMandatoryStop = TimeUnit.MILLISECONDS;
         this.clock = Clock.systemDefaultZone();
     }
 
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BackoffTest.java 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java
similarity index 99%
rename from 
pulsar-client/src/test/java/org/apache/pulsar/client/impl/BackoffTest.java
rename to 
pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java
index 7f13acb7694..b3786236a70 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BackoffTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.impl;
+package org.apache.pulsar.common.util;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
index 4da6b7998a0..b6b5c57ccea 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
@@ -27,7 +27,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -52,7 +52,7 @@ class LockManagerImpl<T> implements LockManager<T> {
     private final MetadataCache<T> cache;
     private final MetadataSerde<T> serde;
     private final FutureUtil.Sequencer<Void> sequencer;
-    private final ExecutorService executor;
+    private final ScheduledExecutorService executor;
 
     private enum State {
         Ready, Closed
@@ -60,13 +60,13 @@ class LockManagerImpl<T> implements LockManager<T> {
 
     private State state = State.Ready;
 
-    LockManagerImpl(MetadataStoreExtended store, Class<T> clazz, 
ExecutorService executor) {
+    LockManagerImpl(MetadataStoreExtended store, Class<T> clazz, 
ScheduledExecutorService executor) {
         this(store, new JSONMetadataSerdeSimpleType<>(
                 TypeFactory.defaultInstance().constructSimpleType(clazz, 
null)),
                 executor);
     }
 
-    LockManagerImpl(MetadataStoreExtended store, MetadataSerde<T> serde, 
ExecutorService executor) {
+    LockManagerImpl(MetadataStoreExtended store, MetadataSerde<T> serde, 
ScheduledExecutorService executor) {
         this.store = store;
         this.cache = store.getMetadataCache(serde);
         this.serde = serde;
@@ -83,7 +83,7 @@ class LockManagerImpl<T> implements LockManager<T> {
 
     @Override
     public CompletableFuture<ResourceLock<T>> acquireLock(String path, T 
value) {
-        ResourceLockImpl<T> lock = new ResourceLockImpl<>(store, serde, path);
+        ResourceLockImpl<T> lock = new ResourceLockImpl<>(store, serde, path, 
executor);
 
         CompletableFuture<ResourceLock<T>> result = new CompletableFuture<>();
         lock.acquire(value).thenRun(() -> {
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
index 93c994b2436..692f224594c 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
@@ -21,8 +21,13 @@ package org.apache.pulsar.metadata.coordination.impl;
 import java.util.EnumSet;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.pulsar.common.util.Backoff;
+import org.apache.pulsar.common.util.BackoffBuilder;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataSerde;
@@ -44,7 +49,10 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
     private long version;
     private final CompletableFuture<Void> expiredFuture;
     private boolean revalidateAfterReconnection = false;
+    private final Backoff backoff;
     private final FutureUtil.Sequencer<Void> sequencer;
+    private final ScheduledExecutorService executor;
+    private ScheduledFuture<?> revalidateTask;
 
     private enum State {
         Init,
@@ -55,7 +63,8 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
 
     private State state;
 
-    public ResourceLockImpl(MetadataStoreExtended store, MetadataSerde<T> 
serde, String path) {
+    ResourceLockImpl(MetadataStoreExtended store, MetadataSerde<T> serde, 
String path,
+                     ScheduledExecutorService executor) {
         this.store = store;
         this.serde = serde;
         this.path = path;
@@ -63,6 +72,11 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
         this.expiredFuture = new CompletableFuture<>();
         this.sequencer = FutureUtil.Sequencer.create();
         this.state = State.Init;
+        this.executor = executor;
+        this.backoff = new BackoffBuilder()
+                .setInitialTime(100, TimeUnit.MILLISECONDS)
+                .setMax(60, TimeUnit.SECONDS)
+                .create();
     }
 
     @Override
@@ -93,6 +107,10 @@ public class ResourceLockImpl<T> implements ResourceLock<T> 
{
         }
 
         state = State.Releasing;
+        if (revalidateTask != null) {
+            revalidateTask.cancel(true);
+        }
+
         CompletableFuture<Void> result = new CompletableFuture<>();
 
         store.delete(path, Optional.of(version))
@@ -210,8 +228,15 @@ public class ResourceLockImpl<T> implements 
ResourceLock<T> {
      * This method is thread-safe and it will perform multiple re-validation 
operations in turn.
      */
     synchronized CompletableFuture<Void> silentRevalidateOnce() {
+        if (state != State.Valid) {
+            return CompletableFuture.completedFuture(null);
+        }
+
         return sequencer.sequential(() -> revalidate(value))
-                .thenRun(() -> log.info("Successfully revalidated the lock on 
{}", path))
+                .thenRun(() -> {
+                    log.info("Successfully revalidated the lock on {}", path);
+                    backoff.reset();
+                })
                 .exceptionally(ex -> {
                     synchronized (ResourceLockImpl.this) {
                         Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
@@ -225,8 +250,12 @@ public class ResourceLockImpl<T> implements 
ResourceLock<T> {
                             // Continue assuming we hold the lock, until we 
can revalidate it, either
                             // on Reconnected or SessionReestablished events.
                             revalidateAfterReconnection = true;
-                            log.warn("Failed to revalidate the lock at {}. 
Retrying later on reconnection {}", path,
-                                    realCause.getMessage());
+
+                            long delayMillis = backoff.next();
+                            log.warn("Failed to revalidate the lock at {}: {} 
- Retrying in {} seconds", path,
+                                    realCause.getMessage(), delayMillis / 
1000.0);
+                            revalidateTask =
+                                    
executor.schedule(this::silentRevalidateOnce, delayMillis, 
TimeUnit.MILLISECONDS);
                         }
                     }
                     return null;
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
index 05e6d4a3845..ebd60bad550 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 import lombok.Cleanup;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException.LockBusyException;
@@ -352,4 +353,34 @@ public class LockManagerTest extends BaseMetadataStoreTest 
{
             }
         });
     }
+
+    @Test(dataProvider = "impl")
+    public void lockDeletedAndReacquired(String provider, Supplier<String> 
urlSupplier) throws Exception {
+        @Cleanup
+        MetadataStoreExtended store = 
MetadataStoreExtended.create(urlSupplier.get(),
+                MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+        MetadataCache<String> cache = store.getMetadataCache(String.class);
+
+        @Cleanup
+        CoordinationService coordinationService = new 
CoordinationServiceImpl(store);
+
+        @Cleanup
+        LockManager<String> lockManager = 
coordinationService.getLockManager(String.class);
+
+        String key = newKey();
+        ResourceLock<String> lock = lockManager.acquireLock(key, 
"lock").join();
+        assertEquals(lock.getValue(), "lock");
+        var res = cache.get(key).join();
+        assertTrue(res.isPresent());
+        assertEquals(res.get(), "lock");
+
+        store.delete(key, Optional.empty()).join();
+
+        Awaitility.await().untilAsserted(() -> {
+            Optional<GetResult> val = store.get(key).join();
+            assertTrue(val.isPresent());
+            assertFalse(lock.getLockExpiredFuture().isDone());
+        });
+    }
 }


Reply via email to