lhotari commented on code in PR #25070:
URL: https://github.com/apache/pulsar/pull/25070#discussion_r2686797205
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java:
##########
@@ -218,31 +278,103 @@ public void handleWatchTopicList(NamespaceName
namespaceName, long watcherId, lo
});
}
+ private void sendTopicListSuccessWithPermitAcquiringRetries(long
watcherId, long requestId,
+
Collection<String> topicList,
+ String hash,
+ Runnable
successfulCompletionCallback,
+ Runnable
failedCompletionCallback) {
+ performOperationWithPermitAcquiringRetries(watcherId, "topic list
success", permitAcquireErrorHandler ->
+ () -> connection.getCommandSender()
+ .sendWatchTopicListSuccess(requestId, watcherId, hash,
topicList, permitAcquireErrorHandler)
+ .whenComplete((__, t) -> {
+ if (t != null) {
+ // this is an unexpected case
+ log.warn("[{}] Failed to send topic list
success for watcherId={}. "
+ + "Watcher is not active.",
connection, watcherId, t);
+ failedCompletionCallback.run();
+ } else {
+ // completed successfully, run the callback
+ successfulCompletionCallback.run();
+ }
+ }));
+ }
+
/***
* @param topicsPattern The regexp for the topic name(not contains
partition suffix).
*/
public void
initializeTopicsListWatcher(CompletableFuture<TopicListWatcher> watcherFuture,
NamespaceName namespace, long watcherId, TopicsPattern
topicsPattern) {
- namespaceService.getListOfPersistentTopics(namespace).
- thenApply(topics -> {
- TopicListWatcher watcher = new TopicListWatcher(this,
watcherId, topicsPattern, topics);
- topicResources.registerPersistentTopicListener(namespace,
watcher);
- return watcher;
- }).
- whenComplete((watcher, exception) -> {
- if (exception != null) {
- watcherFuture.completeExceptionally(exception);
- } else {
- if (!watcherFuture.complete(watcher)) {
- log.warn("[{}] Watcher future was already
completed. Deregistering watcherId={}.",
- connection.toString(), watcherId);
-
topicResources.deregisterPersistentTopicListener(watcher);
- }
- }
- });
+ BooleanSupplier isPermitRequestCancelled = () ->
!connection.isActive() || !watchers.containsKey(watcherId);
+ if (isPermitRequestCancelled.getAsBoolean()) {
+ return;
+ }
+ TopicListSizeResultCache.ResultHolder listSizeHolder =
pulsar.getBrokerService().getTopicListSizeResultCache()
+ .getTopicListSize(namespace.toString(),
CommandGetTopicsOfNamespace.Mode.PERSISTENT);
+ AsyncDualMemoryLimiter maxTopicListInFlightLimiter =
pulsar.getBrokerService().getMaxTopicListInFlightLimiter();
+
+ listSizeHolder.getSizeAsync().thenCompose(initialSize -> {
+ // use heap size limiter to avoid broker getting overwhelmed by a
lot of concurrent topic list requests
+ return maxTopicListInFlightLimiter.withAcquiredPermits(initialSize,
+ AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY,
isPermitRequestCancelled, initialPermits -> {
+ AtomicReference<TopicListWatcher> watcherRef = new
AtomicReference<>();
+ return
namespaceService.getListOfPersistentTopics(namespace).thenCompose(topics -> {
+ long actualSize =
TopicListMemoryLimiter.estimateTopicListSize(topics);
+ listSizeHolder.updateSize(actualSize);
+ // register watcher immediately so that we don't
lose events
+ TopicListWatcher watcher =
+ new TopicListWatcher(this, watcherId,
topicsPattern, topics,
+ connection.ctx().executor());
+ watcherRef.set(watcher);
+
topicResources.registerPersistentTopicListener(namespace, watcher);
+ // use updated permits to slow down responses so
that backpressure gets applied
+ return
maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
+ isPermitRequestCancelled, updatedPermits
-> {
+ // reset retry backoff
+ retryBackoff.reset();
+ // just return the watcher which was
already created before
+ return
CompletableFuture.completedFuture(watcher);
+ }, CompletableFuture::failedFuture);
+ }).whenComplete((watcher, exception) -> {
+ if (exception != null) {
+ if (watcherRef.get() != null) {
+ watcher.close();
Review Comment:
Addressed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]