Copilot commented on code in PR #25272:
URL: https://github.com/apache/pulsar/pull/25272#discussion_r2878353712
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java:
##########
@@ -1885,39 +1854,51 @@ private boolean checkQuotas(Policies policies,
RetentionPolicies retention) {
return checkBacklogQuota(quota, retention);
}
- private void clearBacklog(NamespaceName nsName, String bundleRange, String
subscription) {
- try {
- List<Topic> topicList =
pulsar().getBrokerService().getAllTopicsFromNamespaceBundle(nsName.toString(),
- nsName.toString() + "/" + bundleRange);
-
- List<CompletableFuture<Void>> futures = new ArrayList<>();
- if (subscription != null) {
- if
(subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) {
- subscription =
PersistentReplicator.getRemoteCluster(subscription);
- }
- for (Topic topic : topicList) {
- if (topic instanceof PersistentTopic
- &&
!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic.getName()))) {
- futures.add(((PersistentTopic)
topic).clearBacklog(subscription));
+ private CompletableFuture<Void> clearBacklogAsync(NamespaceName nsName,
String bundleRange, String subscription) {
+ final NamespaceBundleFactory bundleFactory =
pulsar().getNamespaceService().getNamespaceBundleFactory();
+ final NamespaceBundle targetBundle =
bundleFactory.getBundle(nsName.toString(), bundleRange);
+ return pulsar().getNamespaceService().getListOfPersistentTopics(nsName)
Review Comment:
The `clearBacklogAsync` method calls `getListOfPersistentTopics(nsName)` to
retrieve all persistent topics in the entire namespace, then filters to only
those belonging to `targetBundle`. For large namespaces with many topics spread
across many bundles, this fetches all topic metadata unnecessarily. An
optimization would be to start with the already-owned topics (from
`getAllTopicsFromNamespaceBundle`) if the bundle IS owned, and only fall back
to querying all namespace topics when the bundle is unloaded. The current
approach is correct but potentially inefficient for large namespaces where the
bundle is already loaded.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java:
##########
@@ -1885,39 +1854,51 @@ private boolean checkQuotas(Policies policies,
RetentionPolicies retention) {
return checkBacklogQuota(quota, retention);
}
- private void clearBacklog(NamespaceName nsName, String bundleRange, String
subscription) {
- try {
- List<Topic> topicList =
pulsar().getBrokerService().getAllTopicsFromNamespaceBundle(nsName.toString(),
- nsName.toString() + "/" + bundleRange);
-
- List<CompletableFuture<Void>> futures = new ArrayList<>();
- if (subscription != null) {
- if
(subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) {
- subscription =
PersistentReplicator.getRemoteCluster(subscription);
- }
- for (Topic topic : topicList) {
- if (topic instanceof PersistentTopic
- &&
!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic.getName()))) {
- futures.add(((PersistentTopic)
topic).clearBacklog(subscription));
+ private CompletableFuture<Void> clearBacklogAsync(NamespaceName nsName,
String bundleRange, String subscription) {
+ final NamespaceBundleFactory bundleFactory =
pulsar().getNamespaceService().getNamespaceBundleFactory();
+ final NamespaceBundle targetBundle =
bundleFactory.getBundle(nsName.toString(), bundleRange);
+ return pulsar().getNamespaceService().getListOfPersistentTopics(nsName)
+ .thenCompose(topicsInNamespace -> {
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ String effectiveSubscription = subscription;
+ if (effectiveSubscription != null
+ &&
effectiveSubscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix()))
{
+ effectiveSubscription =
PersistentReplicator.getRemoteCluster(effectiveSubscription);
}
- }
- } else {
- for (Topic topic : topicList) {
- if (topic instanceof PersistentTopic
- &&
!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic.getName()))) {
- futures.add(((PersistentTopic) topic).clearBacklog());
+ final String finalSubscription = effectiveSubscription;
+
+ for (String topic : topicsInNamespace) {
+ TopicName topicName = TopicName.get(topic);
+ if
(pulsar().getBrokerService().isSystemTopic(topicName)) {
+ continue;
+ }
+ NamespaceBundle bundle =
bundleFactory.getBundle(topicName);
+ if (bundle == null || !bundle.equals(targetBundle)) {
Review Comment:
The call `bundleFactory.getBundle(topicName)` at this line uses
`bundlesCache.synchronous().get(namespace)`, which is a blocking call that
could block the current thread until the cache is loaded from the metadata
store. Since this executes inside a `thenCompose` lambda of an async pipeline,
blocking an async thread could degrade throughput for concurrent admin
requests. Consider using the async variant
`bundleFactory.getBundlesAsync(topicName.getNamespaceObject()).thenApply(bundles
-> bundles.findBundle(topicName))` and restructuring the loop as a stream of
CompletableFutures to keep the pipeline fully non-blocking.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]