This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 04ff2c9ea58 [improve][broker] PIP-442: Add memory limits for topic 
list watcher (part 2) (#25070)
04ff2c9ea58 is described below

commit 04ff2c9ea58b6247da932735b4a066e10ac5050f
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Jan 16 17:45:10 2026 +0200

    [improve][broker] PIP-442: Add memory limits for topic list watcher (part 
2) (#25070)
    
    (cherry picked from commit 805c71de3d07a89569c3a7763b31c9b042f6776f)
---
 .../pulsar/broker/service/PulsarCommandSender.java |  17 +-
 .../broker/service/PulsarCommandSenderImpl.java    |  32 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |   1 +
 .../pulsar/broker/service/TopicListService.java    | 441 +++++++++++++++++----
 .../broker/service/TopicListServiceTest.java       | 294 +++++++++++++-
 .../broker/service/TopicListWatcherTest.java       |  30 +-
 ...nConsumerBackPressureMultipleConsumersTest.java |   2 +-
 ...icWatcherBackPressureMultipleConsumersTest.java | 213 ++++++++++
 .../apache/pulsar/common/protocol/Commands.java    |   3 +-
 .../semaphore/AsyncDualMemoryLimiterImpl.java      |   6 +
 .../semaphore/AsyncDualMemoryLimiterUtil.java      |  11 +-
 .../common/semaphore/AsyncSemaphoreImpl.java       |   5 +
 .../org/apache/pulsar/common/topics/TopicList.java |  23 +-
 .../semaphore/AsyncDualMemoryLimiterUtilTest.java  |  27 +-
 .../pulsar/proxy/server/LookupProxyHandler.java    |   1 +
 15 files changed, 978 insertions(+), 128 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
index 8e2deff31d0..ca80ca49d76 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
@@ -20,10 +20,11 @@ package org.apache.pulsar.broker.service;
 
 
 import io.netty.util.concurrent.Future;
+import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
+import java.util.function.Function;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
@@ -55,7 +56,8 @@ public interface PulsarCommandSender {
 
     CompletableFuture<Void> sendGetTopicsOfNamespaceResponse(List<String> 
topics, String topicsHash, boolean filtered,
                                                              boolean changed, 
long requestId,
-                                                             
Consumer<Throwable> permitAcquireErrorHandler);
+                                                             
Function<Throwable, CompletableFuture<Void>>
+                                                                     
permitAcquireErrorHandler);
 
     void sendGetSchemaResponse(long requestId, SchemaInfo schema, 
SchemaVersion version);
 
@@ -96,8 +98,13 @@ public interface PulsarCommandSender {
 
     void sendEndTxnErrorResponse(long requestId, TxnID txnID, ServerError 
error, String message);
 
-    void sendWatchTopicListSuccess(long requestId, long watcherId, String 
topicsHash, List<String> topics);
+    CompletableFuture<Void> sendWatchTopicListSuccess(long requestId, long 
watcherId, String topicsHash,
+                                                      Collection<String> 
topics,
+                                                      Function<Throwable, 
CompletableFuture<Void>>
+                                                              
permitAcquireErrorHandler);
 
-    void sendWatchTopicListUpdate(long watcherId,
-                                         List<String> newTopics, List<String> 
deletedTopics, String topicsHash);
+    CompletableFuture<Void> sendWatchTopicListUpdate(long watcherId,
+                                  List<String> newTopics, List<String> 
deletedTopics, String topicsHash,
+                                  Function<Throwable, CompletableFuture<Void>>
+                                                             
permitAcquireErrorHandler);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index 275e049255c..2570cb4431f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -24,10 +24,11 @@ import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
+import java.util.function.Function;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
@@ -129,7 +130,9 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
     @Override
     public CompletableFuture<Void> 
sendGetTopicsOfNamespaceResponse(List<String> topics, String topicsHash,
                                                                     boolean 
filtered, boolean changed, long requestId,
-                                                                    
Consumer<Throwable> permitAcquireErrorHandler) {
+                                                                    
Function<Throwable,
+                                                                            
CompletableFuture<Void>>
+                                                                               
 permitAcquireErrorHandler) {
         BaseCommand command = 
Commands.newGetTopicsOfNamespaceResponseCommand(topics, topicsHash,
                 filtered, changed, requestId);
         safeIntercept(command, cnx);
@@ -366,27 +369,32 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
 
     /***
      * @param topics topic names which are matching, the topic name contains 
the partition suffix.
+     * @return a CompletableFuture&lt;Void&gt; that completes when the 
operation finishes
      */
     @Override
-    public void sendWatchTopicListSuccess(long requestId, long watcherId, 
String topicsHash, List<String> topics) {
+    public CompletableFuture<Void> sendWatchTopicListSuccess(long requestId, 
long watcherId, String topicsHash,
+                                                             
Collection<String> topics,
+                                                             
Function<Throwable, CompletableFuture<Void>>
+                                                                         
permitAcquireErrorHandler) {
         BaseCommand command = Commands.newWatchTopicListSuccess(requestId, 
watcherId, topicsHash, topics);
-        interceptAndWriteCommand(command);
+        safeIntercept(command, cnx);
+        return acquireDirectMemoryPermitsAndWriteAndFlush(cnx.ctx(), 
maxTopicListInFlightLimiter, () -> !cnx.isActive(),
+                command, permitAcquireErrorHandler);
     }
 
     /***
      * {@inheritDoc}
+     * @return a CompletableFuture that completes when the watch topic list 
update operation finishes
      */
     @Override
-    public void sendWatchTopicListUpdate(long watcherId,
-                                         List<String> newTopics, List<String> 
deletedTopics, String topicsHash) {
+    public CompletableFuture<Void> sendWatchTopicListUpdate(long watcherId, 
List<String> newTopics,
+                                                            List<String> 
deletedTopics, String topicsHash,
+                                                            
Function<Throwable, CompletableFuture<Void>>
+                                                                        
permitAcquireErrorHandler) {
         BaseCommand command = Commands.newWatchTopicUpdate(watcherId, 
newTopics, deletedTopics, topicsHash);
-        interceptAndWriteCommand(command);
-    }
-
-    private void interceptAndWriteCommand(BaseCommand command) {
         safeIntercept(command, cnx);
-        ByteBuf outBuf = Commands.serializeWithSize(command);
-        writeAndFlush(outBuf);
+        return acquireDirectMemoryPermitsAndWriteAndFlush(cnx.ctx(), 
maxTopicListInFlightLimiter, () -> !cnx.isActive(),
+                command, permitAcquireErrorHandler);
     }
 
     private void writeAndFlush(ByteBuf outBuf) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 991fc8d536c..cada292dbca 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2695,6 +2695,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                                             
commandSender.sendErrorResponse(requestId,
                                                                     
ServerError.TooManyRequests,
                                                                     "Cannot 
acquire permits for direct memory");
+                                                            return 
CompletableFuture.completedFuture(null);
                                                         });
                                             }, t -> {
                                                 log.warn("[{}] Failed to 
acquire heap memory permits for "
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
index ef2ea284cf7..ec0f9b73e7e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
@@ -18,78 +18,166 @@
  */
 package org.apache.pulsar.broker.service;
 
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
+import java.util.function.BooleanSupplier;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.resources.TopicResources;
+import org.apache.pulsar.broker.topiclistlimit.TopicListMemoryLimiter;
+import org.apache.pulsar.broker.topiclistlimit.TopicListSizeResultCache;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiter;
+import org.apache.pulsar.common.semaphore.AsyncSemaphore;
 import org.apache.pulsar.common.topics.TopicList;
 import org.apache.pulsar.common.topics.TopicsPattern;
 import org.apache.pulsar.common.topics.TopicsPatternFactory;
+import org.apache.pulsar.common.util.Backoff;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.pulsar.metadata.api.NotificationType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class TopicListService {
-
-
     public static class TopicListWatcher implements BiConsumer<String, 
NotificationType> {
-
+        // upper bound for buffered topic list updates
+        private static final int DEFAULT_TOPIC_LIST_UPDATE_MAX_QUEUE_SIZE = 
10000;
         /** Topic names which are matching, the topic name contains the 
partition suffix. **/
-        private final List<String> matchingTopics;
+        private final Set<String> matchingTopics;
         private final TopicListService topicListService;
         private final long id;
+        private final NamespaceName namespace;
         /** The regexp for the topic name(not contains partition suffix). **/
         private final TopicsPattern topicsPattern;
+        private final Executor executor;
+        private volatile boolean closed = false;
+        private boolean sendingInProgress;
+        private final BlockingDeque<Runnable> sendTopicListUpdateTasks;
+        private boolean updatingTopics;
 
-        /***
-         * @param topicsPattern The regexp for the topic name(not contains 
partition suffix).
-         */
         public TopicListWatcher(TopicListService topicListService, long id,
-                                TopicsPattern topicsPattern, List<String> 
topics) {
+                                NamespaceName namespace, TopicsPattern 
topicsPattern, List<String> topics,
+                                Executor executor, int 
topicListUpdateMaxQueueSize) {
             this.topicListService = topicListService;
             this.id = id;
+            this.namespace = namespace;
             this.topicsPattern = topicsPattern;
-            this.matchingTopics = TopicList.filterTopics(topics, 
topicsPattern);
+            this.executor = executor;
+            this.matchingTopics =
+                    TopicList.filterTopics(topics, topicsPattern, 
Collectors.toCollection(LinkedHashSet::new));
+            // start with in progress state since topic list update will be 
sent first
+            this.sendingInProgress = true;
+            this.sendTopicListUpdateTasks =
+                    new LinkedBlockingDeque<>(topicListUpdateMaxQueueSize);
         }
 
-        public List<String> getMatchingTopics() {
-            return matchingTopics;
+        public synchronized Collection<String> getMatchingTopics() {
+            return new ArrayList<>(matchingTopics);
         }
 
         /***
          * @param topicName topic name which contains partition suffix.
          */
         @Override
-        public void accept(String topicName, NotificationType 
notificationType) {
+        public synchronized void accept(String topicName, NotificationType 
notificationType) {
+            if (closed || updatingTopics) {
+                return;
+            }
             String partitionedTopicName = 
TopicName.get(topicName).getPartitionedTopicName();
             String domainLessTopicName = 
TopicList.removeTopicDomainScheme(partitionedTopicName);
 
             if (topicsPattern.matches(domainLessTopicName)) {
-                List<String> newTopics;
-                List<String> deletedTopics;
+                List<String> newTopics = Collections.emptyList();
+                List<String> deletedTopics = Collections.emptyList();
                 if (notificationType == NotificationType.Deleted) {
-                    newTopics = Collections.emptyList();
-                    deletedTopics = Collections.singletonList(topicName);
-                    matchingTopics.remove(topicName);
-                } else {
-                    deletedTopics = Collections.emptyList();
+                    if (matchingTopics.remove(topicName)) {
+                        deletedTopics = Collections.singletonList(topicName);
+                    }
+                } else if (matchingTopics.add(topicName)) {
                     newTopics = Collections.singletonList(topicName);
-                    matchingTopics.add(topicName);
                 }
-                String hash = TopicList.calculateHash(matchingTopics);
-                topicListService.sendTopicListUpdate(id, hash, deletedTopics, 
newTopics);
+                if (!newTopics.isEmpty() || !deletedTopics.isEmpty()) {
+                    String hash = TopicList.calculateHash(matchingTopics);
+                    sendTopicListUpdate(hash, deletedTopics, newTopics);
+                }
+            }
+        }
+
+        // sends updates one-by-one so that ordering is retained
+        private synchronized void sendTopicListUpdate(String hash, 
List<String> deletedTopics, List<String> newTopics) {
+            if (closed || updatingTopics) {
+                return;
+            }
+            Runnable task = () -> topicListService.sendTopicListUpdate(id, 
hash, deletedTopics, newTopics,
+                    this::sendingCompleted);
+            if (!sendingInProgress) {
+                sendingInProgress = true;
+                executor.execute(task);
+            } else {
+                // if sendTopicListSuccess hasn't completed, add to a queue to 
be executed after it completes
+                if (!sendTopicListUpdateTasks.offer(task)) {
+                    log.warn("Update queue was full for watcher id {} matching 
{}. Performing full refresh.", id,
+                            topicsPattern.inputPattern());
+                    if (!updatingTopics) {
+                        updatingTopics = true;
+                        sendTopicListUpdateTasks.clear();
+                        matchingTopics.clear();
+                        executor.execute(() -> 
topicListService.updateTopicListWatcher(this));
+                    }
+                }
+            }
+        }
+
+        // callback that triggers sending the next possibly buffered update
+        @VisibleForTesting
+        synchronized void sendingCompleted() {
+            if (closed) {
+                sendTopicListUpdateTasks.clear();
+                return;
+            }
+            // Execute the next task
+            Runnable task = sendTopicListUpdateTasks.poll();
+            if (task != null) {
+                executor.execute(task);
+            } else {
+                sendingInProgress = false;
             }
         }
+
+        public synchronized void close() {
+            closed = true;
+            sendTopicListUpdateTasks.clear();
+        }
+
+        synchronized void updateTopics(List<String> topics) {
+            matchingTopics.clear();
+            TopicList.filterTopicsToStream(topics, 
topicsPattern).forEach(matchingTopics::add);
+            updatingTopics = false;
+        }
     }
 
 
@@ -97,23 +185,39 @@ public class TopicListService {
 
     private final NamespaceService namespaceService;
     private final TopicResources topicResources;
+    private final PulsarService pulsar;
     private final ServerCnx connection;
     private final boolean enableSubscriptionPatternEvaluation;
     private final int maxSubscriptionPatternLength;
+    private final int topicListUpdateMaxQueueSize;
     private final ConcurrentLongHashMap<CompletableFuture<TopicListWatcher>> 
watchers;
-
+    private final Backoff retryBackoff;
 
     public TopicListService(PulsarService pulsar, ServerCnx connection,
                             boolean enableSubscriptionPatternEvaluation, int 
maxSubscriptionPatternLength) {
+        this(pulsar, connection, enableSubscriptionPatternEvaluation, 
maxSubscriptionPatternLength,
+                TopicListWatcher.DEFAULT_TOPIC_LIST_UPDATE_MAX_QUEUE_SIZE);
+    }
+
+    @VisibleForTesting
+    public TopicListService(PulsarService pulsar, ServerCnx connection,
+                            boolean enableSubscriptionPatternEvaluation, int 
maxSubscriptionPatternLength,
+                            int topicListUpdateMaxQueueSize) {
         this.namespaceService = pulsar.getNamespaceService();
+        this.pulsar = pulsar;
         this.connection = connection;
         this.enableSubscriptionPatternEvaluation = 
enableSubscriptionPatternEvaluation;
         this.maxSubscriptionPatternLength = maxSubscriptionPatternLength;
+        this.topicListUpdateMaxQueueSize = topicListUpdateMaxQueueSize;
         this.watchers = 
ConcurrentLongHashMap.<CompletableFuture<TopicListWatcher>>newBuilder()
                 .expectedItems(8)
                 .concurrencyLevel(1)
                 .build();
         this.topicResources = pulsar.getPulsarResources().getTopicResources();
+        this.retryBackoff = new Backoff(
+                100, TimeUnit.MILLISECONDS,
+                25, TimeUnit.SECONDS,
+                0, TimeUnit.MILLISECONDS);
     }
 
     public void inactivate() {
@@ -162,38 +266,16 @@ public class TopicListService {
         CompletableFuture<TopicListWatcher> existingWatcherFuture = 
watchers.putIfAbsent(watcherId, watcherFuture);
 
         if (existingWatcherFuture != null) {
-            if (existingWatcherFuture.isDone() && 
!existingWatcherFuture.isCompletedExceptionally()) {
-                TopicListWatcher watcher = existingWatcherFuture.getNow(null);
-                log.info("[{}] Watcher with the same id is already created:"
-                                + " watcherId={}, watcher={}",
-                        connection.toString(), watcherId, watcher);
-                watcherFuture = existingWatcherFuture;
-            } else {
-                // There was an early request to create a watcher with the 
same watcherId. This can happen when
-                // client timeout is lower the broker timeouts. We need to 
wait until the previous watcher
-                // creation request either completes or fails.
-                log.warn("[{}] Watcher with id is already present on the 
connection,"
-                        + " consumerId={}", connection.toString(), watcherId);
-                ServerError error;
-                if (!existingWatcherFuture.isDone()) {
-                    error = ServerError.ServiceNotReady;
-                } else {
-                    error = ServerError.UnknownError;
-                    watchers.remove(watcherId, existingWatcherFuture);
-                }
-                connection.getCommandSender().sendErrorResponse(requestId, 
error,
-                        "Topic list watcher is already present on the 
connection");
-                lookupSemaphore.release();
-                return;
-            }
+            log.info("[{}] Watcher with the same watcherId={} is already 
created.", connection, watcherId);
+            // use the existing watcher if it's already created
+            watcherFuture = existingWatcherFuture;
         } else {
             initializeTopicsListWatcher(watcherFuture, namespaceName, 
watcherId, topicsPattern);
         }
 
-
         CompletableFuture<TopicListWatcher> finalWatcherFuture = watcherFuture;
         finalWatcherFuture.thenAccept(watcher -> {
-                    List<String> topicList = watcher.getMatchingTopics();
+                    Collection<String> topicList = watcher.getMatchingTopics();
                     String hash = TopicList.calculateHash(topicList);
                     if (hash.equals(topicsHash)) {
                         topicList = Collections.emptyList();
@@ -203,7 +285,8 @@ public class TopicListService {
                                 "[{}] Received WatchTopicList for namespace 
[//{}] by {}",
                                 connection.toString(), namespaceName, 
requestId);
                     }
-                    
connection.getCommandSender().sendWatchTopicListSuccess(requestId, watcherId, 
hash, topicList);
+                    sendTopicListSuccessWithPermitAcquiringRetries(watcherId, 
requestId, topicList, hash,
+                            watcher::sendingCompleted, watcher::close);
                     lookupSemaphore.release();
                 })
                 .exceptionally(ex -> {
@@ -218,30 +301,158 @@ public class TopicListService {
                 });
     }
 
+    private void sendTopicListSuccessWithPermitAcquiringRetries(long 
watcherId, long requestId,
+                                                                
Collection<String> topicList,
+                                                                String hash,
+                                                                Runnable 
successfulCompletionCallback,
+                                                                Runnable 
failedCompletionCallback) {
+        performOperationWithPermitAcquiringRetries(watcherId, "topic list 
success", permitAcquireErrorHandler ->
+                () -> connection.getCommandSender()
+                        .sendWatchTopicListSuccess(requestId, watcherId, hash, 
topicList, permitAcquireErrorHandler)
+                        .whenComplete((__, t) -> {
+                            if (t != null) {
+                                // this is an unexpected case
+                                log.warn("[{}] Failed to send topic list 
success for watcherId={}. "
+                                        + "Watcher is not active.", 
connection, watcherId, t);
+                                failedCompletionCallback.run();
+                            } else {
+                                // completed successfully, run the callback
+                                successfulCompletionCallback.run();
+                            }
+                        }));
+    }
+
     /***
      * @param topicsPattern The regexp for the topic name(not contains 
partition suffix).
      */
     public void 
initializeTopicsListWatcher(CompletableFuture<TopicListWatcher> watcherFuture,
             NamespaceName namespace, long watcherId, TopicsPattern 
topicsPattern) {
-        namespaceService.getListOfPersistentTopics(namespace).
-                thenApply(topics -> {
-                    TopicListWatcher watcher = new TopicListWatcher(this, 
watcherId, topicsPattern, topics);
-                    topicResources.registerPersistentTopicListener(namespace, 
watcher);
-                    return watcher;
-                }).
-                whenComplete((watcher, exception) -> {
-                    if (exception != null) {
-                        watcherFuture.completeExceptionally(exception);
-                    } else {
-                        if (!watcherFuture.complete(watcher)) {
-                            log.warn("[{}] Watcher future was already 
completed. Deregistering watcherId={}.",
-                                    connection.toString(), watcherId);
-                            
topicResources.deregisterPersistentTopicListener(watcher);
-                        }
-                    }
-                });
+        BooleanSupplier isPermitRequestCancelled = () -> 
!connection.isActive() || !watchers.containsKey(watcherId);
+        if (isPermitRequestCancelled.getAsBoolean()) {
+            return;
+        }
+        TopicListSizeResultCache.ResultHolder listSizeHolder = 
pulsar.getBrokerService().getTopicListSizeResultCache()
+                .getTopicListSize(namespace.toString(), 
CommandGetTopicsOfNamespace.Mode.PERSISTENT);
+        AsyncDualMemoryLimiter maxTopicListInFlightLimiter = 
pulsar.getBrokerService().getMaxTopicListInFlightLimiter();
+
+        listSizeHolder.getSizeAsync().thenCompose(initialSize -> {
+            // use heap size limiter to avoid broker getting overwhelmed by a 
lot of concurrent topic list requests
+            return maxTopicListInFlightLimiter.withAcquiredPermits(initialSize,
+                    AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY, 
isPermitRequestCancelled, initialPermits -> {
+                        AtomicReference<TopicListWatcher> watcherRef = new 
AtomicReference<>();
+                        return 
namespaceService.getListOfPersistentTopics(namespace).thenCompose(topics -> {
+                            long actualSize = 
TopicListMemoryLimiter.estimateTopicListSize(topics);
+                            listSizeHolder.updateSize(actualSize);
+                            // register watcher immediately so that we don't 
lose events
+                            TopicListWatcher watcher =
+                                    new TopicListWatcher(this, watcherId, 
namespace, topicsPattern, topics,
+                                            connection.ctx().executor(), 
topicListUpdateMaxQueueSize);
+                            watcherRef.set(watcher);
+                            
topicResources.registerPersistentTopicListener(namespace, watcher);
+                            // use updated permits to slow down responses so 
that backpressure gets applied
+                            return 
maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
+                                    isPermitRequestCancelled, updatedPermits 
-> {
+                                        // reset retry backoff
+                                        retryBackoff.reset();
+                                        // just return the watcher which was 
already created before
+                                        return 
CompletableFuture.completedFuture(watcher);
+                                    }, CompletableFuture::failedFuture);
+                        }).whenComplete((watcher, exception) -> {
+                            if (exception != null) {
+                                TopicListWatcher w = watcherRef.get();
+                                if (w != null) {
+                                    w.close();
+                                    
topicResources.deregisterPersistentTopicListener(w);
+                                }
+                                // triggers a retry
+                                throw 
FutureUtil.wrapToCompletionException(exception);
+                            } else {
+                                if (!watcherFuture.complete(watcher)) {
+                                    log.warn("[{}] Watcher future was already 
completed. Deregistering "
+                                            + "watcherId={}.", connection, 
watcherId);
+                                    watcher.close();
+                                    
topicResources.deregisterPersistentTopicListener(watcher);
+                                    watchers.remove(watcherId, watcherFuture);
+                                }
+                            }
+                        });
+                    }, CompletableFuture::failedFuture);
+        }).exceptionally(t -> {
+            Throwable unwrappedException = 
FutureUtil.unwrapCompletionException(t);
+            if (!isPermitRequestCancelled.getAsBoolean() && (
+                    unwrappedException instanceof 
AsyncSemaphore.PermitAcquireTimeoutException
+                            || unwrappedException instanceof 
AsyncSemaphore.PermitAcquireQueueFullException)) {
+                // retry with backoff if permit acquisition fails due to 
timeout or queue full
+                long retryAfterMillis = this.retryBackoff.next();
+                log.info("[{}] {} when initializing topic list watcher 
watcherId={} for namespace {}. Retrying in {} "
+                                + "ms.", connection, 
unwrappedException.getMessage(), watcherId, namespace,
+                        retryAfterMillis);
+                connection.ctx().executor()
+                        .schedule(() -> 
initializeTopicsListWatcher(watcherFuture, namespace, watcherId, topicsPattern),
+                                retryAfterMillis, TimeUnit.MILLISECONDS);
+            } else {
+                log.warn("[{}] Failed to initialize topic list watcher 
watcherId={} for namespace {}.", connection,
+                        watcherId, namespace, unwrappedException);
+                watcherFuture.completeExceptionally(unwrappedException);
+            }
+            return null;
+        });
     }
 
+    void updateTopicListWatcher(TopicListWatcher watcher) {
+        long watcherId = watcher.id;
+        BooleanSupplier isPermitRequestCancelled = () -> 
!connection.isActive() || !watchers.containsKey(watcherId);
+        if (isPermitRequestCancelled.getAsBoolean()) {
+            return;
+        }
+        NamespaceName namespace = watcher.namespace;
+        TopicListSizeResultCache.ResultHolder listSizeHolder = 
pulsar.getBrokerService().getTopicListSizeResultCache()
+                .getTopicListSize(namespace.toString(), 
CommandGetTopicsOfNamespace.Mode.PERSISTENT);
+        AsyncDualMemoryLimiter maxTopicListInFlightLimiter = 
pulsar.getBrokerService().getMaxTopicListInFlightLimiter();
+
+        listSizeHolder.getSizeAsync().thenCompose(initialSize -> {
+            // use heap size limiter to avoid broker getting overwhelmed by a 
lot of concurrent topic list requests
+            return maxTopicListInFlightLimiter.withAcquiredPermits(initialSize,
+                    AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY, 
isPermitRequestCancelled, initialPermits -> {
+                        return 
namespaceService.getListOfPersistentTopics(namespace).thenCompose(topics -> {
+                            long actualSize = 
TopicListMemoryLimiter.estimateTopicListSize(topics);
+                            listSizeHolder.updateSize(actualSize);
+                            // use updated permits to slow down responses so 
that backpressure gets applied
+                            return 
maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
+                                    isPermitRequestCancelled, updatedPermits 
-> {
+                                        // reset retry backoff
+                                        retryBackoff.reset();
+                                        // just return topics here
+                                        return 
CompletableFuture.completedFuture(topics);
+                                    }, CompletableFuture::failedFuture);
+                        }).whenComplete((topics, exception) -> {
+                            if (exception != null) {
+                                // triggers a retry
+                                throw 
FutureUtil.wrapToCompletionException(exception);
+                            } else {
+                                watcher.updateTopics(topics);
+                            }
+                        });
+                    }, CompletableFuture::failedFuture);
+        }).exceptionally(t -> {
+            Throwable unwrappedException = 
FutureUtil.unwrapCompletionException(t);
+            if (!isPermitRequestCancelled.getAsBoolean() && (
+                    unwrappedException instanceof 
AsyncSemaphore.PermitAcquireTimeoutException
+                            || unwrappedException instanceof 
AsyncSemaphore.PermitAcquireQueueFullException)) {
+                // retry with backoff if permit acquisition fails due to 
timeout or queue full
+                long retryAfterMillis = this.retryBackoff.next();
+                log.info("[{}] {} when updating topic list watcher 
watcherId={} for namespace {}. Retrying in {} "
+                                + "ms.", connection, 
unwrappedException.getMessage(), watcherId, namespace,
+                        retryAfterMillis);
+                connection.ctx().executor()
+                        .schedule(() -> updateTopicListWatcher(watcher), 
retryAfterMillis, TimeUnit.MILLISECONDS);
+            } else {
+                log.warn("[{}] Failed to update topic list watcher 
watcherId={} for namespace {}.", connection,
+                        watcherId, namespace, unwrappedException);
+            }
+            return null;
+        });
+    }
 
     public void handleWatchTopicListClose(CommandWatchTopicListClose 
commandWatchTopicListClose) {
         long requestId = commandWatchTopicListClose.getRequestId();
@@ -251,7 +462,7 @@ public class TopicListService {
     }
 
     public void deleteTopicListWatcher(Long watcherId) {
-        CompletableFuture<TopicListWatcher> watcherFuture = 
watchers.get(watcherId);
+        CompletableFuture<TopicListWatcher> watcherFuture = 
watchers.remove(watcherId);
         if (watcherFuture == null) {
             log.info("[{}] TopicListWatcher was not registered on the 
connection: {}",
                     watcherId, connection.toString());
@@ -265,21 +476,20 @@ public class TopicListService {
             // create operation will complete, the new watcher will be 
discarded.
             log.info("[{}] Closed watcher before its creation was completed. 
watcherId={}",
                     connection.toString(), watcherId);
-            watchers.remove(watcherId);
-            return;
-        }
-
-        if (watcherFuture.isCompletedExceptionally()) {
-            log.info("[{}] Closed watcher that already failed to be created. 
watcherId={}",
-                    connection.toString(), watcherId);
-            watchers.remove(watcherId);
             return;
         }
 
-        // Proceed with normal watcher close
-        
topicResources.deregisterPersistentTopicListener(watcherFuture.getNow(null));
-        watchers.remove(watcherId);
-        log.info("[{}] Closed watcher, watcherId={}", connection.toString(), 
watcherId);
+        // deregister topic listener while avoiding race conditions
+        watcherFuture.whenComplete((watcher, t) -> {
+            if (watcher != null) {
+                topicResources.deregisterPersistentTopicListener(watcher);
+                watcher.close();
+                log.info("[{}] Closed watcher, watcherId={}", 
connection.toString(), watcherId);
+            } else if (t != null) {
+                log.info("[{}] Closed watcher that failed to be created. 
watcherId={}",
+                        connection.toString(), watcherId);
+            }
+        });
     }
 
     /**
@@ -287,9 +497,76 @@ public class TopicListService {
      * @param newTopics topics names added(contains the partition suffix).
      */
     public void sendTopicListUpdate(long watcherId, String topicsHash, 
List<String> deletedTopics,
-                                    List<String> newTopics) {
-        connection.getCommandSender().sendWatchTopicListUpdate(watcherId, 
newTopics, deletedTopics, topicsHash);
+                                    List<String> newTopics, Runnable 
completionCallback) {
+        performOperationWithPermitAcquiringRetries(watcherId, "topic list 
update", permitAcquireErrorHandler ->
+                () -> connection.getCommandSender()
+                        .sendWatchTopicListUpdate(watcherId, newTopics, 
deletedTopics, topicsHash,
+                                permitAcquireErrorHandler)
+                        .whenComplete((__, t) -> {
+                            if (t != null) {
+                                // this is an unexpected case
+                                log.warn("[{}] Failed to send topic list 
update for watcherId={}. Watcher will be in "
+                                        + "inconsistent state.", connection, 
watcherId, t);
+                            }
+                            completionCallback.run();
+                        }));
     }
 
+    // performs an operation with infinite permit acquiring retries.
+    // If acquiring permits fails, it will retry after a backoff period
+    private void performOperationWithPermitAcquiringRetries(long watcherId, 
String operationName,
+                                                            
Function<Function<Throwable, CompletableFuture<Void>>,
+                                                                    
Supplier<CompletableFuture<Void>>>
+                                                                    
asyncOperationFactory) {
+        // holds a reference to the operation, this is to resolve a circular 
dependency between the error handler and
+        // the actual operation
+        AtomicReference<Supplier<CompletableFuture<Void>>> operationRef = new 
AtomicReference<>();
+        // create the error handler for the operation
+        Function<Throwable, CompletableFuture<Void>> permitAcquireErrorHandler 
=
+                createPermitAcquireErrorHandler(watcherId, operationName, () 
-> operationRef.get().get());
+        // create the async operation using the factory function. Pass the 
error handler to the factory function.
+        Supplier<CompletableFuture<Void>> asyncOperation = 
asyncOperationFactory.apply(permitAcquireErrorHandler);
+        // set the operation to run into the operation reference
+        operationRef.set(() -> {
+            if (!connection.isActive() || !watchers.containsKey(watcherId)) {
+                // do nothing if the connection has already been closed or the 
watcher has been removed
+                return CompletableFuture.completedFuture(null);
+            }
+            return asyncOperation.get().thenRun(() -> retryBackoff.reset());
+        });
+        // run the operation
+        operationRef.get().get();
+    }
 
+    // retries acquiring permits until the connection is closed or the watcher 
is removed
+    private Function<Throwable, CompletableFuture<Void>> 
createPermitAcquireErrorHandler(long watcherId,
+                                                                               
          String operationName,
+                                                                               
          Supplier<CompletableFuture
+                                                                               
                  <Void>> operationRef) {
+        ScheduledExecutorService scheduledExecutor = 
connection.ctx().channel().eventLoop();
+        AtomicInteger retryCount = new AtomicInteger(0);
+        return t -> {
+            Throwable unwrappedException = 
FutureUtil.unwrapCompletionException(t);
+            if (unwrappedException instanceof 
AsyncSemaphore.PermitAcquireCancelledException
+                    || unwrappedException instanceof 
AsyncSemaphore.PermitAcquireAlreadyClosedException
+                    || !connection.isActive()
+                    || !watchers.containsKey(watcherId)) {
+                // stop retrying and complete successfully
+                return CompletableFuture.completedFuture(null);
+            }
+            long retryDelay = retryBackoff.next();
+            retryCount.incrementAndGet();
+            log.info("[{}] Cannot acquire direct memory tokens for sending {}. 
Retry {} in {} ms. {}", connection,
+                    operationName, retryCount.get(), retryDelay, 
t.getMessage());
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            scheduledExecutor.schedule(() -> FutureUtil.completeAfter(future, 
operationRef.get()), retryDelay,
+                    TimeUnit.MILLISECONDS);
+            return future;
+        };
+    }
+
+    @VisibleForTesting
+    CompletableFuture<TopicListWatcher> getWatcherFuture(long watcherId) {
+        return watchers.get(watcherId);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java
index 9109828c025..d85f244b9a3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java
@@ -18,31 +18,70 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoInteractions;
 import static org.mockito.Mockito.when;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.EventLoop;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.ImmediateEventExecutor;
+import io.netty.util.concurrent.ScheduledFuture;
 import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.ListUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.resources.TopicResources;
+import org.apache.pulsar.broker.topiclistlimit.TopicListSizeResultCache;
 import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiter;
+import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiterImpl;
+import org.apache.pulsar.common.semaphore.AsyncSemaphore;
 import org.apache.pulsar.common.topics.TopicList;
 import org.apache.pulsar.common.topics.TopicsPattern;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.awaitility.Awaitility;
+import org.jspecify.annotations.NonNull;
+import org.mockito.InOrder;
 import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+@Slf4j
 public class TopicListServiceTest {
 
     private TopicListService topicListService;
@@ -52,27 +91,99 @@ public class TopicListServiceTest {
     private TopicResources topicResources;
     private final TopicsPattern.RegexImplementation 
topicsPatternImplementation =
             TopicsPattern.RegexImplementation.RE2J_WITH_JDK_FALLBACK;
+    private EventLoop eventLoop;
+    private PulsarCommandSender pulsarCommandSender;
+    private Consumer<Notification> notificationConsumer;
+    private AsyncDualMemoryLimiterImpl memoryLimiter;
+    private ScheduledExecutorService scheduledExecutorService;
+    private PulsarService pulsar;
 
     @BeforeMethod(alwaysRun = true)
     public void setup() throws Exception {
         lookupSemaphore = new Semaphore(1);
         lookupSemaphore.acquire();
         topicListFuture = new CompletableFuture<>();
-        topicResources = mock(TopicResources.class);
 
-        PulsarService pulsar = mock(PulsarService.class);
-        
when(pulsar.getNamespaceService()).thenReturn(mock(NamespaceService.class));
+        AtomicReference<Consumer<Notification>> listenerRef = new 
AtomicReference<>();
+        MetadataStore metadataStore = mock(MetadataStore.class);
+        doAnswer(invocationOnMock -> {
+            listenerRef.set(invocationOnMock.getArgument(0));
+            return null;
+        }).when(metadataStore).registerListener(any());
+        topicResources = spy(new TopicResources(metadataStore));
+        notificationConsumer = listenerRef.get();
+
+        pulsar = mock(PulsarService.class);
+        NamespaceService namespaceService = mock(NamespaceService.class);
+        when(pulsar.getNamespaceService()).thenReturn(namespaceService);
+        doAnswer(invocationOnMock -> topicListFuture)
+                .when(namespaceService).getListOfPersistentTopics(any());
         
when(pulsar.getPulsarResources()).thenReturn(mock(PulsarResources.class));
         
when(pulsar.getPulsarResources().getTopicResources()).thenReturn(topicResources);
-        
when(pulsar.getNamespaceService().getListOfPersistentTopics(any())).thenReturn(topicListFuture);
 
+        BrokerService brokerService = mock(BrokerService.class);
+        when(pulsar.getBrokerService()).thenReturn(brokerService);
+        TopicListSizeResultCache topicListSizeResultCache = 
mock(TopicListSizeResultCache.class);
+        
when(brokerService.getTopicListSizeResultCache()).thenReturn(topicListSizeResultCache);
+        TopicListSizeResultCache.ResultHolder resultHolder = 
mock(TopicListSizeResultCache.ResultHolder.class);
+        
doReturn(resultHolder).when(topicListSizeResultCache).getTopicListSize(anyString(),
 any());
+        
doReturn(CompletableFuture.completedFuture(1L)).when(resultHolder).getSizeAsync();
+
+        memoryLimiter = new AsyncDualMemoryLimiterImpl(1_000_000, 10000, 500, 
1_000_000, 10000, 500);
+        
doReturn(memoryLimiter).when(brokerService).getMaxTopicListInFlightLimiter();
 
         connection = mock(ServerCnx.class);
         when(connection.getRemoteAddress()).thenReturn(new 
InetSocketAddress(10000));
-        
when(connection.getCommandSender()).thenReturn(mock(PulsarCommandSender.class));
+        pulsarCommandSender = mock(PulsarCommandSender.class);
+        when(connection.getCommandSender()).thenReturn(pulsarCommandSender);
+        when(connection.isActive()).thenReturn(true);
+        when(pulsarCommandSender.sendWatchTopicListUpdate(anyLong(), any(), 
any(), anyString(), any()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        when(pulsarCommandSender.sendWatchTopicListSuccess(anyLong(), 
anyLong(), anyString(), any(), any()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+
+        scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor();
 
-        topicListService = new TopicListService(pulsar, connection, true, 30);
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        when(connection.ctx()).thenReturn(ctx);
+        EventExecutor executor = spy(ImmediateEventExecutor.INSTANCE);
+        doReturn(executor).when(ctx).executor();
+        doAnswer(invocationOnMock -> {
+            
scheduledExecutorService.schedule(invocationOnMock.<Runnable>getArgument(0),
+                    invocationOnMock.getArgument(1), 
invocationOnMock.getArgument(2));
+            return mock(ScheduledFuture.class);
+        }).when(executor).schedule(any(Runnable.class), anyLong(), any());
+        Channel channel = mock(Channel.class);
+        when(ctx.channel()).thenReturn(channel);
+        eventLoop = mock(EventLoop.class);
+        when(channel.eventLoop()).thenReturn(eventLoop);
+        doAnswer(invocationOnMock -> {
+            
scheduledExecutorService.schedule(invocationOnMock.<Runnable>getArgument(0),
+                    invocationOnMock.getArgument(1), 
invocationOnMock.getArgument(2));
+            return mock(ScheduledFuture.class);
+        }).when(eventLoop).schedule(any(Runnable.class), anyLong(), any());
+
+        topicListService = newTopicListService();
+
+    }
 
+    private @NonNull TopicListService newTopicListService() {
+        return new TopicListService(pulsar, connection, true, 30);
+    }
+
+    private @NonNull TopicListService newTopicListService(int 
topicListUpdateMaxQueueSize) {
+        return new TopicListService(pulsar, connection, true, 30,
+                topicListUpdateMaxQueueSize);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    void cleanup() {
+        if (memoryLimiter != null) {
+            memoryLimiter.close();
+        }
+        if (scheduledExecutorService != null) {
+            scheduledExecutorService.shutdownNow();
+        }
     }
 
     @Test
@@ -88,10 +199,41 @@ public class TopicListServiceTest {
         List<String> topics = 
Collections.singletonList("persistent://tenant/ns/topic1");
         String hash = TopicList.calculateHash(topics);
         topicListFuture.complete(topics);
-        Assert.assertEquals(1, lookupSemaphore.availablePermits());
+        Awaitility.await().untilAsserted(() -> Assert.assertEquals(1, 
lookupSemaphore.availablePermits()));
         verify(topicResources).registerPersistentTopicListener(
                 eq(NamespaceName.get("tenant/ns")), 
any(TopicListService.TopicListWatcher.class));
-        verify(connection.getCommandSender()).sendWatchTopicListSuccess(7, 13, 
hash, topics);
+        Collection<String> expectedTopics = new ArrayList<>(topics);
+        
verify(connection.getCommandSender()).sendWatchTopicListSuccess(eq(7L), 
eq(13L), eq(hash), eq(expectedTopics),
+                any());
+    }
+
+    @Test
+    public void testCommandWatchSuccessResponseWhenOutOfPermits() throws 
ExecutionException, InterruptedException {
+        // acquire all permits
+        AsyncDualMemoryLimiter.AsyncDualMemoryLimiterPermit permit =
+                memoryLimiter.acquire(1_000_000, 
AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY,
+                                Boolean.FALSE::booleanValue)
+                        .get();
+        topicListService.handleWatchTopicList(
+                NamespaceName.get("tenant/ns"),
+                13,
+                7,
+                "persistent://tenant/ns/topic\\d",
+                topicsPatternImplementation, null,
+                lookupSemaphore);
+        List<String> topics = 
Collections.singletonList("persistent://tenant/ns/topic1");
+        String hash = TopicList.calculateHash(topics);
+        topicListFuture.complete(topics);
+        // wait for acquisition to timeout a few times
+        Thread.sleep(2000);
+        // release the permits
+        memoryLimiter.release(permit);
+        Awaitility.await().untilAsserted(() -> Assert.assertEquals(1, 
lookupSemaphore.availablePermits()));
+        verify(topicResources).registerPersistentTopicListener(
+                eq(NamespaceName.get("tenant/ns")), 
any(TopicListService.TopicListWatcher.class));
+        Collection<String> expectedTopics = new ArrayList<>(topics);
+        
verify(connection.getCommandSender()).sendWatchTopicListSuccess(eq(7L), 
eq(13L), eq(hash), eq(expectedTopics),
+                any());
     }
 
     @Test
@@ -104,7 +246,7 @@ public class TopicListServiceTest {
                 topicsPatternImplementation, null,
                 lookupSemaphore);
         topicListFuture.completeExceptionally(new 
PulsarServerException("Error"));
-        Assert.assertEquals(1, lookupSemaphore.availablePermits());
+        Awaitility.await().untilAsserted(() -> Assert.assertEquals(1, 
lookupSemaphore.availablePermits()));
         verifyNoInteractions(topicResources);
         verify(connection.getCommandSender()).sendErrorResponse(eq(7L), 
any(ServerError.class),
                 eq(PulsarServerException.class.getCanonicalName() + ": 
Error"));
@@ -121,12 +263,146 @@ public class TopicListServiceTest {
                 lookupSemaphore);
         List<String> topics = 
Collections.singletonList("persistent://tenant/ns/topic1");
         topicListFuture.complete(topics);
+        
assertThat(topicListService.getWatcherFuture(13)).succeedsWithin(Duration.ofSeconds(2));
 
         CommandWatchTopicListClose watchTopicListClose = new 
CommandWatchTopicListClose()
                 .setRequestId(8)
                 .setWatcherId(13);
         topicListService.handleWatchTopicListClose(watchTopicListClose);
+
         
verify(topicResources).deregisterPersistentTopicListener(any(TopicListService.TopicListWatcher.class));
     }
 
+    @Test
+    public void testCommandWatchSuccessDirectMemoryAcquirePermitsRetries() {
+        topicListService.handleWatchTopicList(
+                NamespaceName.get("tenant/ns"),
+                13,
+                7,
+                "persistent://tenant/ns/topic\\d",
+                topicsPatternImplementation, null,
+                lookupSemaphore);
+        List<String> topics = 
Collections.singletonList("persistent://tenant/ns/topic1");
+        String hash = TopicList.calculateHash(topics);
+        AtomicInteger failureCount = new AtomicInteger(0);
+        doAnswer(invocationOnMock -> {
+            if (failureCount.incrementAndGet() < 3) {
+                Throwable failure = new 
AsyncSemaphore.PermitAcquireTimeoutException("Acquire timed out");
+                Function<Throwable, CompletableFuture<Void>> 
permitAcquireErrorHandler =
+                        invocationOnMock.getArgument(4);
+                return permitAcquireErrorHandler.apply(failure);
+            } else {
+                return CompletableFuture.completedFuture(null);
+            }
+        }).when(pulsarCommandSender).sendWatchTopicListSuccess(anyLong(), 
anyLong(), anyString(), any(), any());
+        topicListFuture.complete(topics);
+        
assertThat(topicListService.getWatcherFuture(13)).succeedsWithin(Duration.ofSeconds(2));
+        Collection<String> expectedTopics = new ArrayList<>(topics);
+        verify(connection.getCommandSender(), timeout(2000L).times(3))
+                .sendWatchTopicListSuccess(eq(7L), eq(13L), eq(hash), 
eq(expectedTopics), any());
+    }
+
+    @Test
+    public void testCommandWatchUpdate() {
+        topicListService.handleWatchTopicList(
+                NamespaceName.get("tenant/ns"),
+                13,
+                7,
+                "persistent://tenant/ns/topic\\d",
+                topicsPatternImplementation, null,
+                lookupSemaphore);
+        List<String> topics = 
Collections.singletonList("persistent://tenant/ns/topic1");
+        topicListFuture.complete(topics);
+        
assertThat(topicListService.getWatcherFuture(13)).succeedsWithin(Duration.ofSeconds(2));
+
+        List<String> newTopics = 
Collections.singletonList("persistent://tenant/ns/topic2");
+        String hash = TopicList.calculateHash(ListUtils.union(topics, 
newTopics));
+        notificationConsumer.accept(
+                new Notification(NotificationType.Created, 
"/managed-ledgers/tenant/ns/persistent/topic2"));
+        verify(connection.getCommandSender(), timeout(2000L))
+                .sendWatchTopicListUpdate(eq(13L), eq(newTopics), any(), 
eq(hash), any());
+
+        hash = TopicList.calculateHash(newTopics);
+        notificationConsumer.accept(
+                new Notification(NotificationType.Deleted, 
"/managed-ledgers/tenant/ns/persistent/topic1"));
+        verify(connection.getCommandSender(), timeout(2000L))
+                .sendWatchTopicListUpdate(eq(13L), eq(List.of()), eq(topics), 
eq(hash), any());
+    }
+
+    @Test
+    public void testCommandWatchUpdateRetries() {
+        topicListService.handleWatchTopicList(
+                NamespaceName.get("tenant/ns"),
+                13,
+                7,
+                "persistent://tenant/ns/topic\\d",
+                topicsPatternImplementation, null,
+                lookupSemaphore);
+        List<String> topics = 
Collections.singletonList("persistent://tenant/ns/topic1");
+        topicListFuture.complete(topics);
+        
assertThat(topicListService.getWatcherFuture(13)).succeedsWithin(Duration.ofSeconds(2));
+
+        List<String> newTopics = 
Collections.singletonList("persistent://tenant/ns/topic2");
+        String hash = TopicList.calculateHash(ListUtils.union(topics, 
newTopics));
+        AtomicInteger failureCount = new AtomicInteger(0);
+        doAnswer(invocationOnMock -> {
+            List<String> newTopicsArg = invocationOnMock.getArgument(1);
+            if (!newTopicsArg.isEmpty() && failureCount.incrementAndGet() < 3) 
{
+                Throwable failure = new 
AsyncSemaphore.PermitAcquireTimeoutException("Acquire timed out");
+                Function<Throwable, CompletableFuture<Void>> 
permitAcquireErrorHandler =
+                        invocationOnMock.getArgument(4);
+                return permitAcquireErrorHandler.apply(failure);
+            } else {
+                return CompletableFuture.completedFuture(null);
+            }
+        }).when(pulsarCommandSender).sendWatchTopicListUpdate(anyLong(), 
any(), any(), anyString(), any());
+        notificationConsumer.accept(
+                new Notification(NotificationType.Created, 
"/managed-ledgers/tenant/ns/persistent/topic2"));
+        notificationConsumer.accept(
+                new Notification(NotificationType.Deleted, 
"/managed-ledgers/tenant/ns/persistent/topic2"));
+        InOrder inOrder = inOrder(connection.getCommandSender());
+        inOrder.verify(connection.getCommandSender(), timeout(2000L).times(3))
+                .sendWatchTopicListUpdate(eq(13L), eq(newTopics), 
eq(List.of()), eq(hash), any());
+        inOrder.verify(connection.getCommandSender(), timeout(2000L).times(1))
+                .sendWatchTopicListUpdate(eq(13L), eq(List.of()), 
eq(newTopics), any(), any());
+    }
+
+    @Test
+    public void testCommandWatchUpdateQueueOverflows() {
+        int topicListUpdateMaxQueueSize = 10;
+        topicListService = newTopicListService(topicListUpdateMaxQueueSize);
+        topicListService.handleWatchTopicList(
+                NamespaceName.get("tenant/ns"),
+                13,
+                7,
+                "persistent://tenant/ns/topic\\d+",
+                topicsPatternImplementation, null,
+                lookupSemaphore);
+        List<String> topics = 
Collections.singletonList("persistent://tenant/ns/topic1");
+        topicListFuture.complete(topics);
+        
assertThat(topicListService.getWatcherFuture(13)).succeedsWithin(Duration.ofSeconds(2));
+
+        CompletableFuture<Void> completePending = new CompletableFuture<>();
+        doReturn(completePending).when(pulsarCommandSender)
+                .sendWatchTopicListUpdate(anyLong(), any(), any(), 
anyString(), any());
+        topicListFuture = new CompletableFuture<>();
+
+        // when the queue overflows
+        for (int i = 10; i <= 10 + topicListUpdateMaxQueueSize + 1; i++) {
+            notificationConsumer.accept(
+                    new Notification(NotificationType.Created, 
"/managed-ledgers/tenant/ns/persistent/topic" + i));
+        }
+
+        // a new listing should be performed. Return 100 topics in the 
response, simulating that events have been lost
+        List<String> updatedTopics = IntStream.range(1, 101).mapToObj(i -> 
"persistent://tenant/ns/topic" + i).toList();
+        topicListFuture.complete(updatedTopics);
+        // validate that the watcher's matching topics have been updated
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture<TopicListService.TopicListWatcher> watcherFuture 
= topicListService.getWatcherFuture(13);
+            assertThat(watcherFuture).isNotNull();
+            assertThat(watcherFuture.join().getMatchingTopics())
+                    .containsExactlyInAnyOrderElementsOf(updatedTopics);
+        });
+    }
+
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java
index 884cdc0ef92..46262e84d38 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java
@@ -18,12 +18,19 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.topics.TopicList;
 import org.apache.pulsar.common.topics.TopicsPattern;
 import org.apache.pulsar.common.topics.TopicsPatternFactory;
@@ -53,7 +60,9 @@ public class TopicListWatcherTest {
     @BeforeMethod(alwaysRun = true)
     public void setup() {
         topicListService = mock(TopicListService.class);
-        watcher = new TopicListService.TopicListWatcher(topicListService, ID, 
PATTERN, INITIAL_TOPIC_LIST);
+        watcher = new TopicListService.TopicListWatcher(topicListService, ID, 
NamespaceName.get("tenant", "ns"),
+                PATTERN, INITIAL_TOPIC_LIST, MoreExecutors.directExecutor(), 
9);
+        watcher.sendingCompleted();
     }
 
     @Test
@@ -71,8 +80,8 @@ public class TopicListWatcherTest {
         List<String> allMatchingTopics = Arrays.asList(
                 "persistent://tenant/ns/topic1", 
"persistent://tenant/ns/topic2", newTopic);
         String hash = TopicList.calculateHash(allMatchingTopics);
-        verify(topicListService).sendTopicListUpdate(ID, hash, 
Collections.emptyList(),
-                Collections.singletonList(newTopic));
+        verify(topicListService).sendTopicListUpdate(eq(ID), eq(hash), 
eq(Collections.emptyList()),
+                eq(Collections.singletonList(newTopic)), any());
         Assert.assertEquals(
                 allMatchingTopics,
                 watcher.getMatchingTopics());
@@ -85,8 +94,8 @@ public class TopicListWatcherTest {
 
         List<String> allMatchingTopics = 
Collections.singletonList("persistent://tenant/ns/topic2");
         String hash = TopicList.calculateHash(allMatchingTopics);
-        verify(topicListService).sendTopicListUpdate(ID, hash,
-                Collections.singletonList(deletedTopic), 
Collections.emptyList());
+        verify(topicListService).sendTopicListUpdate(eq(ID), eq(hash),
+                eq(Collections.singletonList(deletedTopic)), 
eq(Collections.emptyList()), any());
         Assert.assertEquals(
                 allMatchingTopics,
                 watcher.getMatchingTopics());
@@ -100,4 +109,15 @@ public class TopicListWatcherTest {
                 Arrays.asList("persistent://tenant/ns/topic1", 
"persistent://tenant/ns/topic2"),
                 watcher.getMatchingTopics());
     }
+
+    @Test
+    public void testUpdateQueueOverFlowPerformsFullUpdate() {
+        for (int i = 10; i <= 20; i++) {
+            String newTopic = "persistent://tenant/ns/topic" + i;
+            watcher.accept(newTopic, NotificationType.Created);
+        }
+        verify(topicListService).sendTopicListUpdate(anyLong(), anyString(), 
any(), any(), any());
+        verify(topicListService).updateTopicListWatcher(any());
+        verifyNoMoreInteractions(topicListService);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerBackPressureMultipleConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerBackPressureMultipleConsumersTest.java
index b7f17b8ba3d..8e7f54fde54 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerBackPressureMultipleConsumersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerBackPressureMultipleConsumersTest.java
@@ -256,7 +256,7 @@ public class 
PatternConsumerBackPressureMultipleConsumersTest extends MockedPuls
     }
 
     // Use this implementation when PIP-234 isn't available
-    private static class SharedClientResources implements AutoCloseable {
+    public static class SharedClientResources implements AutoCloseable {
         private final EventLoopGroup ioEventLoopGroup;
         private final ExecutorProvider internalExecutorProvider;
         private final ExecutorProvider externalExecutorProvider;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerTopicWatcherBackPressureMultipleConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerTopicWatcherBackPressureMultipleConsumersTest.java
new file mode 100644
index 00000000000..e47c85d16d9
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerTopicWatcherBackPressureMultipleConsumersTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.IntStream;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiter;
+import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiterImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-impl")
+public class PatternConsumerTopicWatcherBackPressureMultipleConsumersTest 
extends MockedPulsarServiceBaseTest {
+
+    @Override
+    @BeforeMethod
+    protected void setup() throws Exception {
+        isTcpLookup = useTcpLookup();
+        super.internalSetup();
+        setupDefaultTenantAndNamespace();
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        conf.setSubscriptionPatternMaxLength(100);
+    }
+
+    protected boolean useTcpLookup() {
+        return true;
+    }
+
+    @Override
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test(timeOut = 60 * 1000)
+    public void 
testPatternConsumerWithLargeAmountOfConcurrentClientConnections()
+            throws PulsarAdminException, InterruptedException, IOException, 
ExecutionException, TimeoutException {
+        // create a new namespace for this test
+        String namespace = BrokerTestUtil.newUniqueName("public/ns");
+        admin.namespaces().createNamespace(namespace);
+
+        // use multiple clients so that each client has a separate connection 
to the broker
+        final int numberOfClients = 100;
+
+        // create a long topic name to consume more memory per topic
+        final String topicNamePrefix = "persistent://" + namespace + "/" + 
StringUtils.repeat('a', 512) + "-";
+        // number of topics to create
+        final int topicCount = 300;
+
+        // create topics
+        createTopics(topicCount, topicNamePrefix, "_0");
+
+        {
+            @Cleanup
+            
PatternConsumerBackPressureMultipleConsumersTest.SharedClientResources 
sharedClientResources =
+                    new 
PatternConsumerBackPressureMultipleConsumersTest.SharedClientResources();
+            List<PulsarClientImpl> clients = new ArrayList<>(numberOfClients);
+            @Cleanup
+            Closeable closeClients = () -> {
+                for (PulsarClient client : clients) {
+                    try {
+                        client.close();
+                    } catch (PulsarClientException e) {
+                        log.error("Failed to close client {}", client, e);
+                    }
+                }
+            };
+            for (int i = 0; i < numberOfClients; i++) {
+                ClientBuilderImpl clientBuilderImpl =
+                        (ClientBuilderImpl) PulsarClient.builder()
+                                .serviceUrl(getClientServiceUrl())
+                                .statsInterval(0, TimeUnit.SECONDS)
+                                .operationTimeout(1, TimeUnit.MINUTES);
+                PulsarClientImpl client = 
sharedClientResources.useSharedResources(
+                        
PulsarClientImpl.builder().conf(clientBuilderImpl.getClientConfigurationData())).build();
+                clients.add(client);
+            }
+
+            List<CompletableFuture<Consumer<String>>> consumerFutures = new 
ArrayList<>(numberOfClients);
+            for (int i = 0; i < topicCount; i++) {
+                String topicsPattern = namespace + "/a+-" + i + "_[01]$";
+                CompletableFuture<Consumer<String>> consumerFuture =
+                        clients.get(i % 
numberOfClients).newConsumer(Schema.STRING)
+                                
.topicsPattern(topicsPattern).subscriptionName("sub" + i)
+                                .subscribeAsync();
+                consumerFutures.add(consumerFuture);
+                consumerFuture.exceptionally(throwable -> {
+                    log.error("Failed to subscribe to pattern {}", 
topicsPattern, throwable);
+                    return null;
+                });
+            }
+
+            FutureUtil.waitForAll(consumerFutures).get(60, TimeUnit.SECONDS);
+
+            List<Consumer<String>> consumers = 
consumerFutures.stream().map(CompletableFuture::join).toList();
+
+            List<? extends CompletableFuture<?>> watcherFutures = 
consumers.stream().map(consumer -> {
+                try {
+                    CompletableFuture<?> watcherFuture = consumer instanceof 
PatternMultiTopicsConsumerImpl
+                            ? (CompletableFuture<?>) 
FieldUtils.readField(consumer, "watcherFuture", true) : null;
+                    return watcherFuture;
+                } catch (IllegalAccessException e) {
+                    throw new RuntimeException(e);
+                }
+            }).filter(Objects::nonNull).toList();
+
+            // wait for all watcher futures to complete
+            FutureUtil.waitForAll(watcherFutures).get(60, TimeUnit.SECONDS);
+
+            PulsarClientImpl client = clients.get(0);
+            sendAndValidate(topicCount, client, consumers, topicNamePrefix, 
"_0");
+
+            // create additional topics
+            createTopics(topicCount, topicNamePrefix, "_1");
+
+            // send to additional topic
+            sendAndValidate(topicCount, client, consumers, topicNamePrefix, 
"_1");
+        }
+
+        validateThatTokensHaventLeakedOrIncreased();
+    }
+
+    protected void validateThatTokensHaventLeakedOrIncreased() {
+        AsyncDualMemoryLimiterImpl limiter =
+                pulsar.getBrokerService().getMaxTopicListInFlightLimiter();
+        
assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY).getAvailablePermits())
+                
.isEqualTo(pulsar.getConfiguration().getMaxTopicListInFlightHeapMemSizeMB() * 
1024L * 1024);
+        
assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY).getAcquiredPermits())
+                .isEqualTo(0);
+        
assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.DIRECT_MEMORY).getAvailablePermits())
+                
.isEqualTo(pulsar.getConfiguration().getMaxTopicListInFlightDirectMemSizeMB() * 
1024L * 1024);
+        
assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.DIRECT_MEMORY).getAcquiredPermits())
+                .isEqualTo(0);
+    }
+
+    private void createTopics(int topicCount, String topicNamePrefix, String 
topicNameSuffix)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        List<CompletableFuture<Void>> createTopicFutures = IntStream.range(0, 
topicCount)
+                .mapToObj(i -> 
admin.topics().createNonPartitionedTopicAsync(topicNamePrefix + i + 
topicNameSuffix))
+                .toList();
+        // wait for all topics to be created
+        FutureUtil.waitForAll(createTopicFutures).get(30, TimeUnit.SECONDS);
+    }
+
+    private static void sendAndValidate(int topicCount, PulsarClientImpl 
client, List<Consumer<String>> consumers,
+                                        String topicNamePrefix,
+                                        String topicNameSuffix) throws 
PulsarClientException {
+        for (int i = 0; i < topicCount; i++) {
+            // send message to every topic
+            Producer<String> producer =
+                    client.newProducer(Schema.STRING).topic(topicNamePrefix + 
i + topicNameSuffix).create();
+            producer.send("test" + i);
+            producer.close();
+        }
+
+        // validate that every consumer receives a single message
+        for (int i = 0; i < consumers.size(); i++) {
+            Consumer<String> consumer = consumers.get(i);
+            int finalI = i;
+            assertThat(consumer.receive(10, TimeUnit.SECONDS)).isNotNull()
+                    .satisfies(message -> assertThat(message.getValue())
+                            .isEqualTo("test" + finalI));
+            // validate that no more messages are received
+            assertThat(consumer.receive(1, TimeUnit.MICROSECONDS)).isNull();
+        }
+    }
+
+    protected String getClientServiceUrl() {
+        return lookupUrl.toString();
+    }
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index e0aac72c1a3..c6912ae4fe1 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -31,6 +31,7 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Base64;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -1613,7 +1614,7 @@ public class Commands {
      * @param topics topic names which are matching, the topic name contains 
the partition suffix.
      */
     public static BaseCommand newWatchTopicListSuccess(long requestId, long 
watcherId, String topicsHash,
-                                                       List<String> topics) {
+                                                       Collection<String> 
topics) {
         BaseCommand cmd = new 
BaseCommand().setType(Type.WATCH_TOPIC_LIST_SUCCESS);
         cmd.setWatchTopicListSuccess()
                 .setRequestId(requestId)
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImpl.java
index 7a707f8b5e5..58603851662 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImpl.java
@@ -175,5 +175,11 @@ public class AsyncDualMemoryLimiterImpl implements 
AsyncDualMemoryLimiter, AutoC
         public AsyncSemaphore.AsyncSemaphorePermit getUnderlyingPermit() {
             return underlyingPermit;
         }
+
+        @Override
+        public String toString() {
+            return "DualMemoryLimiterPermit@" + System.identityHashCode(this) 
+ "{" + "limitType=" + limitType
+                    + ", permits=" + underlyingPermit.getPermits() + '}';
+        }
     }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterUtil.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterUtil.java
index 616681dc155..9f3880a2c09 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterUtil.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterUtil.java
@@ -93,17 +93,17 @@ public class AsyncDualMemoryLimiterUtil {
                                                                                
              dualMemoryLimiter,
                                                                                
      BooleanSupplier isCancelled,
                                                                                
      BaseCommand command,
-                                                                               
      Consumer<Throwable>
+                                                                               
      Function<Throwable,
+                                                                               
              CompletableFuture<Void>>
                                                                                
              permitAcquireErrorHandler
                                                                                
      ) {
         // Calculate serialized size before acquiring permits
         int serializedSize = command.getSerializedSize();
         // Acquire permits
         return dualMemoryLimiter.acquire(serializedSize, 
AsyncDualMemoryLimiter.LimitType.DIRECT_MEMORY, isCancelled)
-                .whenComplete((permits, t) -> {
+                .handle((permits, t) -> {
                     if (t != null) {
-                        permitAcquireErrorHandler.accept(t);
-                        return;
+                        return permitAcquireErrorHandler.apply(t);
                     }
                     try {
                         // Serialize the response
@@ -118,6 +118,7 @@ public class AsyncDualMemoryLimiterUtil {
                         dualMemoryLimiter.release(permits);
                         throw e;
                     }
-                }).thenApply(__ -> null);
+                    return CompletableFuture.<Void>completedFuture(null);
+                }).thenCompose(Function.identity());
     }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphoreImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphoreImpl.java
index 6df47256ee3..6ee9028711f 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphoreImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphoreImpl.java
@@ -359,5 +359,10 @@ public class AsyncSemaphoreImpl implements AsyncSemaphore, 
AutoCloseable {
         public long releasePermits() {
             return PERMITS_UPDATER.getAndSet(this, 0);
         }
+
+        @Override
+        public String toString() {
+            return "SemaphorePermit@" + System.identityHashCode(this) + 
"[permits=" + permits + "]";
+        }
     }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
index 629aedb3e72..0e38f022472 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
@@ -27,7 +27,9 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collector;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import lombok.experimental.UtilityClass;
 import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicDomain;
@@ -56,6 +58,22 @@ public class TopicList {
      * Filter topics using a TopicListPattern instance.
      */
     public static List<String> filterTopics(List<String> original, 
TopicsPattern topicsPattern) {
+        return filterTopics(original, topicsPattern, Collectors.toList());
+    }
+
+    /**
+     * Filter topics using a TopicListPattern instance and collect the results 
using a specified collector.
+     */
+    public static <R> R filterTopics(List<String> original, TopicsPattern 
topicsPattern,
+                                              Collector<String, ?, R> 
collector) {
+        return filterTopicsToStream(original, topicsPattern)
+                .collect(collector);
+    }
+
+    /**
+     * Filter topics using a TopicListPattern instance and return a stream of 
filtered topic names.
+     */
+    public static Stream<String> filterTopicsToStream(List<String> original, 
TopicsPattern topicsPattern) {
         return original.stream()
                 .map(TopicName::get)
                 .filter(topicName -> {
@@ -63,8 +81,7 @@ public class TopicList {
                     String removedScheme = 
SCHEME_SEPARATOR_PATTERN.split(partitionedTopicName)[1];
                     return topicsPattern.matches(removedScheme);
                 })
-                .map(TopicName::toString)
-                .collect(Collectors.toList());
+                .map(TopicName::toString);
     }
 
     public static List<String> filterSystemTopic(List<String> original) {
@@ -73,7 +90,7 @@ public class TopicList {
                 .collect(Collectors.toList());
     }
 
-    public static String calculateHash(List<String> topics) {
+    public static String calculateHash(Collection<String> topics) {
         Hasher hasher = Hashing.crc32c().newHasher();
         String[] sortedTopics = topics.toArray(new String[topics.size()]);
         Arrays.sort(sortedTopics);
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterUtilTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterUtilTest.java
index 86d155353df..e07d9410b47 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterUtilTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterUtilTest.java
@@ -262,7 +262,10 @@ public class AsyncDualMemoryLimiterUtilTest {
                 limiter,
                 () -> false,
                 command,
-                throwable -> errorHandlerCalled.set(true)
+                throwable -> {
+                    errorHandlerCalled.set(true);
+                    return FutureUtil.failedFuture(throwable);
+                }
         );
 
         result.get(1, TimeUnit.SECONDS);
@@ -297,6 +300,7 @@ public class AsyncDualMemoryLimiterUtilTest {
                 throwable -> {
                     errorHandlerCalled.set(true);
                     capturedError.set(throwable);
+                    return FutureUtil.failedFuture(throwable);
                 }
         );
 
@@ -323,7 +327,10 @@ public class AsyncDualMemoryLimiterUtilTest {
                 limiter,
                 () -> false,
                 command,
-                throwable -> errorHandlerCalled.set(true)
+                throwable -> {
+                    errorHandlerCalled.set(true);
+                    return FutureUtil.failedFuture(throwable);
+                }
         );
 
         try {
@@ -351,7 +358,10 @@ public class AsyncDualMemoryLimiterUtilTest {
                 limiter,
                 () -> false,
                 command,
-                throwable -> errorHandlerCalled.set(true)
+                throwable -> {
+                    errorHandlerCalled.set(true);
+                    return FutureUtil.failedFuture(throwable);
+                }
         );
 
         try {
@@ -382,7 +392,10 @@ public class AsyncDualMemoryLimiterUtilTest {
                 limiter,
                 cancelled::get,
                 command,
-                throwable -> errorHandlerCalled.set(true)
+                throwable -> {
+                    errorHandlerCalled.set(true);
+                    return FutureUtil.failedFuture(throwable);
+                }
         );
 
         // Cancel the request
@@ -420,6 +433,7 @@ public class AsyncDualMemoryLimiterUtilTest {
                     () -> false,
                     command,
                     throwable -> {
+                        return FutureUtil.failedFuture(throwable);
                     }
             );
         }
@@ -453,7 +467,10 @@ public class AsyncDualMemoryLimiterUtilTest {
                 limiter,
                 () -> false,
                 command,
-                throwable -> errorHandlerCalled.set(true)
+                throwable -> {
+                    errorHandlerCalled.set(true);
+                    return FutureUtil.failedFuture(throwable);
+                }
         );
 
         result.get(1, TimeUnit.SECONDS);
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index d276c7996a6..b0bd26bbcdf 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -421,6 +421,7 @@ public class LookupProxyHandler {
                             clientAddress, t.getMessage());
                     writeAndFlush(Commands.newError(clientRequestId, 
ServerError.TooManyRequests,
                             "Failed due to direct memory limit exceeded"));
+                    return CompletableFuture.completedFuture(null);
                 });
     }
 


Reply via email to