This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0868e21eb347f6c50302c8504f6bd640c1013101 Author: 道君- Tao Jiuming <[email protected]> AuthorDate: Tue Sep 9 11:31:12 2025 +0800 [improve][broker] Optimize Reader creation in TopicPoliciesService (#24658) Co-authored-by: Zixuan Liu <[email protected]> (cherry picked from commit 0cda4f400b0cb785447fc1a64edb4f97c17c309a) --- .../SystemTopicBasedTopicPoliciesService.java | 93 ++++++++++++++++++---- .../pulsar/broker/admin/TopicPoliciesTest.java | 6 ++ 2 files changed, 84 insertions(+), 15 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index c745d591a4e..8287583a3d7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -579,8 +579,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic return policyCacheInitMap.computeIfAbsent(namespace, (k) -> { final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture = - createSystemTopicClient(namespace); - readerCaches.put(namespace, readerCompletableFuture); + newReader(namespace); final CompletableFuture<Void> initFuture = readerCompletableFuture .thenCompose(reader -> { final CompletableFuture<Void> stageFuture = new CompletableFuture<>(); @@ -594,9 +593,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic if (closed.get()) { return null; } - log.error("[{}] Failed to create reader on __change_events topic", - namespace, ex); - cleanCacheAndCloseReader(namespace, false); + cleanPoliciesCacheInitMap( + namespace, readerCompletableFuture.isCompletedExceptionally()); } catch (Throwable cleanupEx) { // Adding this catch to avoid break callback chain log.error("[{}] Failed to cleanup reader on __change_events topic", @@ -610,6 +608,20 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic }); } + private CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> newReader(NamespaceName ns) { + return readerCaches.compute(ns, (__, existingFuture) -> { + if (existingFuture == null) { + return createSystemTopicClient(ns); + } + + if (existingFuture.isDone() && existingFuture.isCompletedExceptionally()) { + return existingFuture.exceptionallyCompose(ex -> + isAlreadyClosedException(ex) ? existingFuture : createSystemTopicClient(ns)); + } + return existingFuture; + }); + } + protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> createSystemTopicClient( NamespaceName namespace) { if (closed.get()) { @@ -633,7 +645,9 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic } AtomicInteger bundlesCount = ownedBundlesCountPerNamespace.get(namespace); if (bundlesCount == null || bundlesCount.decrementAndGet() <= 0) { - cleanCacheAndCloseReader(namespace, true, true); + cleanPoliciesCacheInitMap(namespace, true); + cleanWriterCache(namespace); + cleanOwnedBundlesCount(namespace); } } @@ -665,7 +679,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, CompletableFuture<Void> future) { if (closed.get()) { future.completeExceptionally(new BrokerServiceException(getClass().getName() + " is closed.")); - cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false); + cleanPoliciesCacheInitMap(reader.getSystemTopic().getTopicName().getNamespaceObject(), true); return; } reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> { @@ -673,7 +687,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic log.error("[{}] Failed to check the move events for the system topic", reader.getSystemTopic().getTopicName(), ex); future.completeExceptionally(ex); - cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false); + cleanPoliciesCacheInitMap(reader.getSystemTopic().getTopicName().getNamespaceObject(), + isAlreadyClosedException(ex)); return; } if (hasMore) { @@ -692,7 +707,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic log.error("[{}] Failed to read event from the system topic.", reader.getSystemTopic().getTopicName(), e); future.completeExceptionally(e); - cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false); + cleanPoliciesCacheInitMap(reader.getSystemTopic().getTopicName().getNamespaceObject(), + isAlreadyClosedException(ex)); return null; }); } else { @@ -718,10 +734,45 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic }); } - private void cleanCacheAndCloseReader(@NonNull NamespaceName namespace, boolean cleanOwnedBundlesCount) { - cleanCacheAndCloseReader(namespace, cleanOwnedBundlesCount, false); + + private void cleanPoliciesCacheInitMap(@NonNull NamespaceName namespace, boolean closeReader) { + if (!closeReader) { + policyCacheInitMap.remove(namespace); + return; + } + + TopicPolicyMessageHandlerTracker topicPolicyMessageHandlerTracker = + topicPolicyMessageHandlerTrackers.remove(namespace); + if (topicPolicyMessageHandlerTracker != null) { + topicPolicyMessageHandlerTracker.close(); + } + + CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerFuture = readerCaches.remove(namespace); + policyCacheInitMap.compute(namespace, (k, v) -> { + policiesCache.entrySet().removeIf(entry -> Objects.equals(entry.getKey().getNamespaceObject(), namespace)); + globalPoliciesCache.entrySet() + .removeIf(entry -> Objects.equals(entry.getKey().getNamespaceObject(), namespace)); + return null; + }); + if (readerFuture != null && !readerFuture.isCompletedExceptionally()) { + readerFuture + .thenCompose(SystemTopicClient.Reader::closeAsync) + .exceptionally(ex -> { + log.warn("[{}] Close change_event reader fail.", namespace, ex); + return null; + }); + } + } + + private void cleanWriterCache(@NonNull NamespaceName namespace) { + writerCaches.synchronous().invalidate(namespace); } + private void cleanOwnedBundlesCount(@NonNull NamespaceName namespace) { + ownedBundlesCountPerNamespace.remove(namespace); + } + + private void cleanCacheAndCloseReader(@NonNull NamespaceName namespace, boolean cleanOwnedBundlesCount, boolean cleanWriterCache) { if (cleanWriterCache) { @@ -754,6 +805,9 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic }); } + + + /** * This is an async method for the background reader to continue syncing new messages. * @@ -763,7 +817,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic private void readMorePoliciesAsync(SystemTopicClient.Reader<PulsarEvent> reader) { NamespaceName namespaceObject = reader.getSystemTopic().getTopicName().getNamespaceObject(); if (closed.get()) { - cleanCacheAndCloseReader(namespaceObject, false); + cleanPoliciesCacheInitMap(namespaceObject, true); return; } reader.readNextAsync() @@ -784,11 +838,10 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic if (ex == null) { readMorePoliciesAsync(reader); } else { - Throwable cause = FutureUtil.unwrapCompletionException(ex); - if (cause instanceof PulsarClientException.AlreadyClosedException) { + if (isAlreadyClosedException(ex)) { log.info("Closing the topic policies reader for {}", reader.getSystemTopic().getTopicName()); - cleanCacheAndCloseReader(namespaceObject, false); + cleanPoliciesCacheInitMap(namespaceObject, true); } else { log.warn("Read more topic polices exception, read again.", ex); readMorePoliciesAsync(reader); @@ -797,6 +850,11 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic }); } + private boolean isAlreadyClosedException(Throwable ex) { + Throwable cause = FutureUtil.unwrapCompletionException(ex); + return cause instanceof PulsarClientException.AlreadyClosedException; + } + private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) { // delete policies if (msg.getValue() == null) { @@ -884,6 +942,11 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic } + @VisibleForTesting + public Map<NamespaceName, CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>> getReaderCaches() { + return readerCaches; + } + @VisibleForTesting long getPoliciesCacheSize() { return policiesCache.size(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index f8c3f580fc1..8d8e7c385b5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -64,6 +64,7 @@ import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter; +import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; @@ -77,6 +78,7 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.api.proto.CommandSubscribe; +import org.apache.pulsar.common.events.PulsarEvent; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicDomain; @@ -3578,6 +3580,10 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { policyCacheInitMap.clear(); policiesCache.clear(); globalPoliciesCache.clear(); + + Map<NamespaceName, CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>> readerCaches = + ((SystemTopicBasedTopicPoliciesService) topicPoliciesService).getReaderCaches(); + readerCaches.clear(); } @DataProvider(name = "reloadPolicyTypes")
