This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c08c3cf3576b62aa1cc5d6976b229e20a1495ed5 Author: Qiang Zhao <[email protected]> AuthorDate: Mon Dec 30 23:57:27 2024 +0800 [fix][broker] topic policy deadlock block metadata thread. (#23786) (cherry picked from commit 86f8a84409116e5e42a9463189c0dedaf5cd291a) --- .../SystemTopicBasedTopicPoliciesService.java | 55 +++++++++++----------- 1 file changed, 27 insertions(+), 28 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index cc3938491e6..5488d5563f6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -37,6 +37,9 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nonnull; import org.apache.commons.lang3.concurrent.ConcurrentInitializer; import org.apache.commons.lang3.concurrent.LazyInitializer; +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.commons.lang3.mutable.MutableObject; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener; @@ -267,37 +270,33 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic return CompletableFuture.completedFuture(Optional.empty()); } final CompletableFuture<Boolean> preparedFuture = prepareInitPoliciesCacheAsync(topicName.getNamespaceObject()); - final var resultFuture = new CompletableFuture<Optional<TopicPolicies>>(); - preparedFuture.thenAccept(inserted -> policyCacheInitMap.compute(namespace, (___, existingFuture) -> { - if (!inserted || existingFuture != null) { - final var partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName()); - final var policies = Optional.ofNullable(switch (type) { - case DEFAULT -> Optional.ofNullable(policiesCache.get(partitionedTopicName)) - .orElseGet(() -> globalPoliciesCache.get(partitionedTopicName)); - case GLOBAL_ONLY -> globalPoliciesCache.get(partitionedTopicName); - case LOCAL_ONLY -> policiesCache.get(partitionedTopicName); - }); - resultFuture.complete(policies); - } else { - CompletableFuture.runAsync(() -> { - log.info("The future of {} has been removed from cache, retry getTopicPolicies again", namespace); - // Call it in another thread to avoid recursive update because getTopicPoliciesAsync() could call - // policyCacheInitMap.computeIfAbsent() - getTopicPoliciesAsync(topicName, type).whenComplete((result, e) -> { - if (e == null) { - resultFuture.complete(result); - } else { - resultFuture.completeExceptionally(e); - } + // switch thread to avoid potential metadata thread cost and recursive deadlock + return preparedFuture.thenComposeAsync(inserted -> { + // initialized : policies + final Mutable<Pair<Boolean, Optional<TopicPolicies>>> policiesFutureHolder = new MutableObject<>(); + // NOTICE: avoid using any callback with lock scope to avoid deadlock + policyCacheInitMap.compute(namespace, (___, existingFuture) -> { + if (!inserted || existingFuture != null) { + final var partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName()); + final var policies = Optional.ofNullable(switch (type) { + case DEFAULT -> Optional.ofNullable(policiesCache.get(partitionedTopicName)) + .orElseGet(() -> globalPoliciesCache.get(partitionedTopicName)); + case GLOBAL_ONLY -> globalPoliciesCache.get(partitionedTopicName); + case LOCAL_ONLY -> policiesCache.get(partitionedTopicName); }); - }); + policiesFutureHolder.setValue(Pair.of(true, policies)); + } else { + policiesFutureHolder.setValue(Pair.of(false, null)); + } + return existingFuture; + }); + final var p = policiesFutureHolder.getValue(); + if (!p.getLeft()) { + log.info("The future of {} has been removed from cache, retry getTopicPolicies again", namespace); + return getTopicPoliciesAsync(topicName, type); } - return existingFuture; - })).exceptionally(e -> { - resultFuture.completeExceptionally(e); - return null; + return CompletableFuture.completedFuture(p.getRight()); }); - return resultFuture; } public void addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
