This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d9f564076a8 [feat][broker] PIP-145: Notifications for faster topic 
discovery (#16062)
d9f564076a8 is described below

commit d9f564076a85ce108de16b1d350765605509bd1e
Author: Andras Beni <andras.b...@streamnative.io>
AuthorDate: Fri Jul 1 23:07:26 2022 +0200

    [feat][broker] PIP-145: Notifications for faster topic discovery (#16062)
    
    * PIP-145: Notifications for faster topic discovery
    
    This commit introduces topic list watchers. By using these objects
    clients can observe the creation or deletion of topics closer to
    real-time. This reduces latency in consuming the first messages
    published to a topic when using a pattern-based subscription.
    
    Modifications:
    - New commands were added to the binary protocol to enable registering
     and deregistering watchers.
    - Pattern-based consumers create TopicListWatcher objects if the broker
      supports this feature. Otherwise, they fall back to polling only.
    - The watchers use ConnectionHandler to obtain a connection to a broker.
    - Once connected, watchers register and wait for updates.
    - ServerCnx uses the newly created TopicListService to manage watchers.
    - TopicListService listens to metadata notifications and sends updates.
    
    * Fix checkstyle violation
    
    * Fix cpp client compile error
    
    * Remove unused code, remove failed watchers
    
    * Rename command Unwatch, extract fields from command to avoid concurrent 
modification
    
    * Fix cpp client compile error
    
    * Fix cpp client compile error
    
    Co-authored-by: Matteo Merli <mme...@apache.org>
---
 .../pulsar/broker/resources/TopicResources.java    |  43 ++++
 .../broker/resources/TopicResourcesTest.java       | 109 ++++++++
 .../pulsar/broker/service/PulsarCommandSender.java |   7 +-
 .../broker/service/PulsarCommandSenderImpl.java    |  24 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |  70 +++++-
 .../pulsar/broker/service/TopicListService.java    | 251 +++++++++++++++++++
 .../pulsar/broker/service/ServerCnxTest.java       |  23 ++
 .../broker/service/TopicListServiceTest.java       | 132 ++++++++++
 .../broker/service/TopicListWatcherTest.java       | 102 ++++++++
 .../broker/service/utils/ClientChannelHelper.java  |   6 +
 .../apache/pulsar/client/api/ClientErrorsTest.java |   4 +-
 .../pulsar/client/api/MockBrokerService.java       |   2 +-
 pulsar-client-cpp/lib/Commands.cc                  |  12 +
 .../client/impl/BinaryProtoLookupService.java      |   5 +
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  71 ++++++
 .../pulsar/client/impl/ConnectionHandler.java      |  10 +-
 .../org/apache/pulsar/client/impl/HttpClient.java  |   4 +
 .../pulsar/client/impl/HttpLookupService.java      |   5 +
 .../apache/pulsar/client/impl/LookupService.java   |   7 +
 .../impl/PatternMultiTopicsConsumerImpl.java       |  31 ++-
 .../pulsar/client/impl/PulsarClientImpl.java       |  10 +
 .../pulsar/client/impl/TopicListWatcher.java       | 278 +++++++++++++++++++++
 .../apache/pulsar/client/impl/ClientCnxTest.java   | 109 ++++++++
 .../pulsar/client/impl/PulsarClientImplTest.java   |   2 +-
 .../pulsar/client/impl/TopicListWatcherTest.java   | 111 ++++++++
 .../apache/pulsar/common/protocol/Commands.java    |  62 ++++-
 .../pulsar/common/protocol/PulsarDecoder.java      |  42 ++++
 .../org/apache/pulsar/common/topics/TopicList.java |   2 +
 pulsar-common/src/main/proto/PulsarApi.proto       |  40 +++
 .../pulsar/proxy/server/ProxyConnection.java       |   5 +-
 30 files changed, 1555 insertions(+), 24 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
index bea67943f82..ee537431280 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
@@ -19,22 +19,35 @@
 package org.apache.pulsar.broker.resources;
 
 import static org.apache.pulsar.common.util.Codec.decode;
+import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
 
 public class TopicResources {
     private static final String MANAGED_LEDGER_PATH = "/managed-ledgers";
 
     private final MetadataStore store;
 
+    private final Map<BiConsumer<String, NotificationType>, Pattern> 
topicListeners;
+
     public TopicResources(MetadataStore store) {
         this.store = store;
+        topicListeners = new ConcurrentHashMap<>();
+        store.registerListener(this::handleNotification);
     }
 
     public CompletableFuture<List<String>> 
listPersistentTopicsAsync(NamespaceName ns) {
@@ -110,4 +123,34 @@ public class TopicResources {
                     }
                 });
     }
+
+    void handleNotification(Notification notification) {
+        if (notification.getPath().startsWith(MANAGED_LEDGER_PATH)
+                && EnumSet.of(NotificationType.Created, 
NotificationType.Deleted).contains(notification.getType())) {
+            for (Map.Entry<BiConsumer<String, NotificationType>, Pattern> 
entry :
+                    new HashMap<>(topicListeners).entrySet()) {
+                Matcher matcher = 
entry.getValue().matcher(notification.getPath());
+                if (matcher.matches()) {
+                    TopicName topicName = TopicName.get(
+                            matcher.group(2), 
NamespaceName.get(matcher.group(1)), matcher.group(3));
+                    entry.getKey().accept(topicName.toString(), 
notification.getType());
+                }
+            }
+        }
+    }
+
+    Pattern namespaceNameToTopicNamePattern(NamespaceName namespaceName) {
+        return Pattern.compile(
+                MANAGED_LEDGER_PATH + "/(" + namespaceName + ")/(" + 
TopicDomain.persistent + ")/(" + "[^/]+)");
+    }
+
+    public void registerPersistentTopicListener(
+            NamespaceName namespaceName, BiConsumer<String, NotificationType> 
listener) {
+        topicListeners.put(listener, 
namespaceNameToTopicNamePattern(namespaceName));
+    }
+
+    public void deregisterPersistentTopicListener(BiConsumer<String, 
NotificationType> listener) {
+        topicListeners.remove(listener);
+    }
+
 }
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/TopicResourcesTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/TopicResourcesTest.java
new file mode 100644
index 00000000000..cfafc5107d2
--- /dev/null
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/TopicResourcesTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resources;
+
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import java.util.function.BiConsumer;
+
+public class TopicResourcesTest {
+
+    private MetadataStore metadataStore;
+    private TopicResources topicResources;
+
+    @BeforeMethod
+    public void setup() {
+        metadataStore = mock(MetadataStore.class);
+        topicResources = new TopicResources(metadataStore);
+    }
+
+    @Test
+    public void testConstructorRegistersAsListener() {
+        verify(metadataStore).registerListener(any());
+    }
+
+    @Test
+    public void testListenerInvokedWhenTopicCreated() {
+        BiConsumer<String, NotificationType> listener = mock(BiConsumer.class);
+        
topicResources.registerPersistentTopicListener(NamespaceName.get("tenant/namespace"),
 listener);
+        topicResources.handleNotification(new 
Notification(NotificationType.Created, 
"/managed-ledgers/tenant/namespace/persistent/topic"));
+        verify(listener).accept("persistent://tenant/namespace/topic", 
NotificationType.Created);
+    }
+
+    @Test
+    public void testListenerInvokedWhenTopicV1Created() {
+        BiConsumer<String, NotificationType> listener = mock(BiConsumer.class);
+        
topicResources.registerPersistentTopicListener(NamespaceName.get("tenant/cluster/namespace"),
 listener);
+        topicResources.handleNotification(new 
Notification(NotificationType.Created, 
"/managed-ledgers/tenant/cluster/namespace/persistent/topic"));
+        verify(listener).accept("persistent://tenant/cluster/namespace/topic", 
NotificationType.Created);
+    }
+
+    @Test
+    public void testListenerInvokedWhenTopicDeleted() {
+        BiConsumer<String, NotificationType> listener = mock(BiConsumer.class);
+        
topicResources.registerPersistentTopicListener(NamespaceName.get("tenant/namespace"),
 listener);
+        topicResources.handleNotification(new 
Notification(NotificationType.Deleted, 
"/managed-ledgers/tenant/namespace/persistent/topic"));
+        verify(listener).accept("persistent://tenant/namespace/topic", 
NotificationType.Deleted);
+    }
+
+    @Test
+    public void testListenerNotInvokedWhenSubscriptionCreated() {
+        BiConsumer<String, NotificationType> listener = mock(BiConsumer.class);
+        
topicResources.registerPersistentTopicListener(NamespaceName.get("tenant/namespace"),
 listener);
+        topicResources.handleNotification(new 
Notification(NotificationType.Created, 
"/managed-ledgers/tenant/namespace/persistent/topic/subscription"));
+        verifyNoInteractions(listener);
+    }
+
+    @Test
+    public void testListenerNotInvokedWhenTopicCreatedInOtherNamespace() {
+        BiConsumer<String, NotificationType> listener = mock(BiConsumer.class);
+        
topicResources.registerPersistentTopicListener(NamespaceName.get("tenant/namespace"),
 listener);
+        topicResources.handleNotification(new 
Notification(NotificationType.Created, 
"/managed-ledgers/tenant/namespace2/persistent/topic"));
+        verifyNoInteractions(listener);
+    }
+
+    @Test
+    public void testListenerNotInvokedWhenTopicModified() {
+        BiConsumer<String, NotificationType> listener = mock(BiConsumer.class);
+        
topicResources.registerPersistentTopicListener(NamespaceName.get("tenant/namespace"),
 listener);
+        topicResources.handleNotification(new 
Notification(NotificationType.Modified, 
"/managed-ledgers/tenant/namespace/persistent/topic"));
+        verifyNoInteractions(listener);
+    }
+
+    @Test
+    public void testListenerNotInvokedAfterDeregistered() {
+        BiConsumer<String, NotificationType> listener = mock(BiConsumer.class);
+        
topicResources.registerPersistentTopicListener(NamespaceName.get("tenant/namespace"),
 listener);
+        topicResources.handleNotification(new 
Notification(NotificationType.Created, 
"/managed-ledgers/tenant/namespace/persistent/topic"));
+        verify(listener).accept("persistent://tenant/namespace/topic", 
NotificationType.Created);
+        topicResources.deregisterPersistentTopicListener(listener);
+        topicResources.handleNotification(new 
Notification(NotificationType.Created, 
"/managed-ledgers/tenant/namespace/persistent/topic2"));
+        verifyNoMoreInteractions(listener);
+    }
+
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
index b267f725986..8934d2e6ce6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
@@ -61,7 +61,7 @@ public interface PulsarCommandSender {
 
     void sendGetOrCreateSchemaErrorResponse(long requestId, ServerError error, 
String errorMessage);
 
-    void sendConnectedResponse(int clientProtocolVersion, int maxMessageSize);
+    void sendConnectedResponse(int clientProtocolVersion, int maxMessageSize, 
boolean supportsTopicWatchers);
 
     void sendLookupResponse(String brokerServiceUrl, String 
brokerServiceUrlTls, boolean authoritative,
                             CommandLookupTopicResponse.LookupType response, 
long requestId,
@@ -92,4 +92,9 @@ public interface PulsarCommandSender {
     void sendEndTxnResponse(long requestId, TxnID txnID, int txnAction);
 
     void sendEndTxnErrorResponse(long requestId, TxnID txnID, ServerError 
error, String message);
+
+    void sendWatchTopicListSuccess(long requestId, long watcherId, String 
topicsHash, List<String> topics);
+
+    void sendWatchTopicListUpdate(long watcherId,
+                                         List<String> newTopics, List<String> 
deletedTopics, String topicsHash);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index 57dcaff250d..5b5af41d1ef 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -162,8 +162,9 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
     }
 
     @Override
-    public void sendConnectedResponse(int clientProtocolVersion, int 
maxMessageSize) {
-        BaseCommand command = 
Commands.newConnectedCommand(clientProtocolVersion, maxMessageSize);
+    public void sendConnectedResponse(int clientProtocolVersion, int 
maxMessageSize, boolean supportsTopicWatchers) {
+        BaseCommand command = Commands.newConnectedCommand(
+                clientProtocolVersion, maxMessageSize, supportsTopicWatchers);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
         cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
@@ -346,6 +347,25 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
         }
     }
 
+    @Override
+    public void sendWatchTopicListSuccess(long requestId, long watcherId, 
String topicsHash, List<String> topics) {
+        BaseCommand command = Commands.newWatchTopicListSuccess(requestId, 
watcherId, topicsHash, topics);
+        interceptAndWriteCommand(command);
+    }
+
+    @Override
+    public void sendWatchTopicListUpdate(long watcherId,
+                                         List<String> newTopics, List<String> 
deletedTopics, String topicsHash) {
+        BaseCommand command = Commands.newWatchTopicUpdate(watcherId, 
newTopics, deletedTopics, topicsHash);
+        interceptAndWriteCommand(command);
+    }
+
+    private void interceptAndWriteCommand(BaseCommand command) {
+        safeIntercept(command, cnx);
+        ByteBuf outBuf = Commands.serializeWithSize(command);
+        cnx.ctx().writeAndFlush(outBuf);
+    }
+
     private void safeIntercept(BaseCommand command, ServerCnx cnx) {
         try {
             this.interceptor.onPulsarCommand(command, cnx);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 4f8763330d3..87b6c4208ae 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -50,6 +50,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
@@ -119,6 +120,8 @@ import 
org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.CommandTcClientConnectRequest;
 import org.apache.pulsar.common.api.proto.CommandUnsubscribe;
+import org.apache.pulsar.common.api.proto.CommandWatchTopicList;
+import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose;
 import org.apache.pulsar.common.api.proto.FeatureFlags;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
 import org.apache.pulsar.common.api.proto.KeySharedMode;
@@ -164,6 +167,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
     private final boolean enableSubscriptionPatternEvaluation;
     private final int maxSubscriptionPatternLength;
+    private final TopicListService topicListService;
     private State state;
     private volatile boolean isActive = true;
     private String authRole = null;
@@ -272,6 +276,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         this.connectionController = new 
ConnectionController.DefaultConnectionController(conf);
         this.enableSubscriptionPatternEvaluation = 
conf.isEnableBrokerSideSubscriptionPatternEvaluation();
         this.maxSubscriptionPatternLength = 
conf.getSubscriptionPatternMaxLength();
+        this.topicListService = new TopicListService(pulsar, this,
+                enableSubscriptionPatternEvaluation, 
maxSubscriptionPatternLength);
     }
 
     @Override
@@ -334,6 +340,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 }
             }
         });
+        this.topicListService.inactivate();
         this.service.getPulsarStats().recordConnectionClose();
     }
 
@@ -625,8 +632,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     }
 
     // complete the connect and sent newConnected command
-    private void completeConnect(int clientProtoVersion, String clientVersion) 
{
-        ctx.writeAndFlush(Commands.newConnected(clientProtoVersion, 
maxMessageSize));
+    private void completeConnect(int clientProtoVersion, String clientVersion, 
boolean supportsTopicWatchers) {
+        ctx.writeAndFlush(Commands.newConnected(clientProtoVersion, 
maxMessageSize, supportsTopicWatchers));
         state = State.Connected;
         service.getPulsarStats().recordConnectionCreateSuccess();
         if (log.isDebugEnabled()) {
@@ -685,7 +692,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
             if (state != State.Connected) {
                 // First time authentication is done
-                completeConnect(clientProtocolVersion, clientVersion);
+                completeConnect(clientProtocolVersion, clientVersion, 
enableSubscriptionPatternEvaluation);
             } else {
                 // If the connection was already ready, it means we're doing a 
refresh
                 if (!StringUtils.isEmpty(authRole)) {
@@ -792,7 +799,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         }
 
         if (!service.isAuthenticationEnabled()) {
-            completeConnect(clientProtocolVersion, clientVersion);
+            completeConnect(clientProtocolVersion, clientVersion, 
enableSubscriptionPatternEvaluation);
             return;
         }
 
@@ -820,7 +827,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 authRole = 
getBrokerService().getAuthenticationService().getAnonymousUserRole()
                     .orElseThrow(() ->
                         new AuthenticationException("No anonymous role, and no 
authentication provider configured"));
-                completeConnect(clientProtocolVersion, clientVersion);
+                completeConnect(clientProtocolVersion, clientVersion, 
enableSubscriptionPatternEvaluation);
                 return;
             }
 
@@ -2527,6 +2534,59 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 }));
     }
 
+    protected void handleCommandWatchTopicList(CommandWatchTopicList 
commandWatchTopicList) {
+        final long requestId = commandWatchTopicList.getRequestId();
+        final long watcherId = commandWatchTopicList.getWatcherId();
+        final NamespaceName namespaceName = 
NamespaceName.get(commandWatchTopicList.getNamespace());
+
+        Pattern topicsPattern = 
Pattern.compile(commandWatchTopicList.hasTopicsPattern()
+                ? commandWatchTopicList.getTopicsPattern() : 
TopicList.ALL_TOPICS_PATTERN);
+        String topicsHash = commandWatchTopicList.hasTopicsHash()
+                ? commandWatchTopicList.getTopicsHash() : null;
+
+        final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
+        if (lookupSemaphore.tryAcquire()) {
+            if (invalidOriginalPrincipal(originalPrincipal)) {
+                final String msg = "Valid Proxy Client role should be provided 
for watchTopicListRequest ";
+                log.warn("[{}] {} with role {} and proxyClientAuthRole {} on 
namespace {}", remoteAddress, msg,
+                        authRole, originalPrincipal, namespaceName);
+                commandSender.sendErrorResponse(watcherId, 
ServerError.AuthorizationError, msg);
+                lookupSemaphore.release();
+                return;
+            }
+            isNamespaceOperationAllowed(namespaceName, 
NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> {
+                if (isAuthorized) {
+                    topicListService.handleWatchTopicList(namespaceName, 
watcherId, requestId, topicsPattern,
+                            topicsHash, lookupSemaphore);
+                } else {
+                    final String msg = "Proxy Client is not authorized to 
watchTopicList";
+                    log.warn("[{}] {} with role {} on namespace {}", 
remoteAddress, msg, getPrincipal(), namespaceName);
+                    commandSender.sendErrorResponse(requestId, 
ServerError.AuthorizationError, msg);
+                    lookupSemaphore.release();
+                }
+                return null;
+            }).exceptionally(ex -> {
+                logNamespaceNameAuthException(remoteAddress, "watchTopicList", 
getPrincipal(),
+                        Optional.of(namespaceName), ex);
+                final String msg = "Exception occurred while trying to handle 
command WatchTopicList";
+                commandSender.sendErrorResponse(requestId, 
ServerError.AuthorizationError, msg);
+                lookupSemaphore.release();
+                return null;
+            });
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Failed WatchTopicList due to too many 
lookup-requests {}", remoteAddress,
+                        namespaceName);
+            }
+            commandSender.sendErrorResponse(requestId, 
ServerError.TooManyRequests,
+                    "Failed due to too many pending lookup requests");
+        }
+    }
+
+    protected void handleCommandWatchTopicListClose(CommandWatchTopicListClose 
commandWatchTopicListClose) {
+        topicListService.handleWatchTopicListClose(commandWatchTopicListClose);
+    }
+
     @Override
     protected boolean isHandshakeCompleted() {
         return state == State.Connected;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
new file mode 100644
index 00000000000..78ee45223eb
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+import java.util.function.BiConsumer;
+import java.util.regex.Pattern;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.resources.TopicResources;
+import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose;
+import org.apache.pulsar.common.api.proto.ServerError;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.topics.TopicList;
+import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicListService {
+
+
+    public static class TopicListWatcher implements BiConsumer<String, 
NotificationType> {
+
+        private final List<String> matchingTopics;
+        private final TopicListService topicListService;
+        private final long id;
+        private final Pattern topicsPattern;
+
+        public TopicListWatcher(TopicListService topicListService, long id,
+                                Pattern topicsPattern, List<String> topics) {
+            this.topicListService = topicListService;
+            this.id = id;
+            this.topicsPattern = topicsPattern;
+            this.matchingTopics = TopicList.filterTopics(topics, 
topicsPattern);
+        }
+
+        public List<String> getMatchingTopics() {
+            return matchingTopics;
+        }
+
+        @Override
+        public void accept(String topicName, NotificationType 
notificationType) {
+            if (topicsPattern.matcher(topicName).matches()) {
+                List<String> newTopics;
+                List<String> deletedTopics;
+                if (notificationType == NotificationType.Deleted) {
+                    newTopics = Collections.emptyList();
+                    deletedTopics = Collections.singletonList(topicName);
+                    matchingTopics.remove(topicName);
+                } else {
+                    deletedTopics = Collections.emptyList();
+                    newTopics = Collections.singletonList(topicName);
+                    matchingTopics.add(topicName);
+                }
+                String hash = TopicList.calculateHash(matchingTopics);
+                topicListService.sendTopicListUpdate(id, hash, deletedTopics, 
newTopics);
+            }
+        }
+    }
+
+
+    private static final Logger log = 
LoggerFactory.getLogger(TopicListService.class);
+
+    private final NamespaceService namespaceService;
+    private final TopicResources topicResources;
+    private final ServerCnx connection;
+    private final boolean enableSubscriptionPatternEvaluation;
+    private final int maxSubscriptionPatternLength;
+    private final ConcurrentLongHashMap<CompletableFuture<TopicListWatcher>> 
watchers;
+
+
+    public TopicListService(PulsarService pulsar, ServerCnx connection,
+                            boolean enableSubscriptionPatternEvaluation, int 
maxSubscriptionPatternLength) {
+        this.namespaceService = pulsar.getNamespaceService();
+        this.connection = connection;
+        this.enableSubscriptionPatternEvaluation = 
enableSubscriptionPatternEvaluation;
+        this.maxSubscriptionPatternLength = maxSubscriptionPatternLength;
+        this.watchers = 
ConcurrentLongHashMap.<CompletableFuture<TopicListWatcher>>newBuilder()
+                .expectedItems(8)
+                .concurrencyLevel(1)
+                .build();
+        this.topicResources = pulsar.getPulsarResources().getTopicResources();
+    }
+
+    public void inactivate() {
+        for (Long watcherId : new HashSet<>(watchers.keys())) {
+            deleteTopicListWatcher(watcherId);
+        }
+    }
+
+    public void handleWatchTopicList(NamespaceName namespaceName, long 
watcherId, long requestId, Pattern topicsPattern,
+                                     String topicsHash, Semaphore 
lookupSemaphore) {
+
+        if (!enableSubscriptionPatternEvaluation || 
topicsPattern.pattern().length() > maxSubscriptionPatternLength) {
+            String msg = "Unable to create topic list watcher: ";
+            if (!enableSubscriptionPatternEvaluation) {
+                msg += "Evaluating subscription patterns is disabled.";
+            } else {
+                msg += "Pattern longer than maximum: " + 
maxSubscriptionPatternLength;
+            }
+            log.warn("[{}] {} on namespace {}", connection.getRemoteAddress(), 
msg, namespaceName);
+            connection.getCommandSender().sendErrorResponse(requestId, 
ServerError.NotAllowedError, msg);
+            lookupSemaphore.release();
+            return;
+        }
+        CompletableFuture<TopicListWatcher> watcherFuture = new 
CompletableFuture<>();
+        CompletableFuture<TopicListWatcher> existingWatcherFuture = 
watchers.putIfAbsent(watcherId, watcherFuture);
+
+        if (existingWatcherFuture != null) {
+            if (existingWatcherFuture.isDone() && 
!existingWatcherFuture.isCompletedExceptionally()) {
+                TopicListWatcher watcher = existingWatcherFuture.getNow(null);
+                log.info("[{}] Watcher with the same id is already created:"
+                                + " watcherId={}, watcher={}",
+                        connection.getRemoteAddress(), watcherId, watcher);
+                watcherFuture = existingWatcherFuture;
+            } else {
+                // There was an early request to create a watcher with the 
same watcherId. This can happen when
+                // client timeout is lower the broker timeouts. We need to 
wait until the previous watcher
+                // creation request either completes or fails.
+                log.warn("[{}] Watcher with id is already present on the 
connection,"
+                        + " consumerId={}", connection.getRemoteAddress(), 
watcherId);
+                ServerError error;
+                if (!existingWatcherFuture.isDone()) {
+                    error = ServerError.ServiceNotReady;
+                } else {
+                    error = ServerError.UnknownError;
+                    watchers.remove(watcherId, existingWatcherFuture);
+                }
+                connection.getCommandSender().sendErrorResponse(requestId, 
error,
+                        "Topic list watcher is already present on the 
connection");
+                lookupSemaphore.release();
+                return;
+            }
+        } else {
+            initializeTopicsListWatcher(watcherFuture, namespaceName, 
watcherId, topicsPattern);
+        }
+
+
+        CompletableFuture<TopicListWatcher> finalWatcherFuture = watcherFuture;
+        finalWatcherFuture.thenAccept(watcher -> {
+                    List<String> topicList = watcher.getMatchingTopics();
+                    String hash = TopicList.calculateHash(topicList);
+                    if (hash.equals(topicsHash)) {
+                        topicList = Collections.emptyList();
+                    }
+                    if (log.isDebugEnabled()) {
+                        log.debug(
+                                "[{}] Received WatchTopicList for namespace 
[//{}] by {}",
+                                connection.getRemoteAddress(), namespaceName, 
requestId);
+                    }
+                    
connection.getCommandSender().sendWatchTopicListSuccess(requestId, watcherId, 
hash, topicList);
+                    lookupSemaphore.release();
+                })
+                .exceptionally(ex -> {
+                    log.warn("[{}] Error WatchTopicList for namespace [//{}] 
by {}",
+                            connection.getRemoteAddress(), namespaceName, 
requestId);
+                    connection.getCommandSender().sendErrorResponse(requestId,
+                            BrokerServiceException.getClientErrorCode(
+                                    new 
BrokerServiceException.ServerMetadataException(ex)), ex.getMessage());
+                    watchers.remove(watcherId, finalWatcherFuture);
+                    lookupSemaphore.release();
+                    return null;
+                });
+    }
+
+
+    public void 
initializeTopicsListWatcher(CompletableFuture<TopicListWatcher> watcherFuture,
+            NamespaceName namespace, long watcherId, Pattern topicsPattern) {
+        namespaceService.getListOfPersistentTopics(namespace).
+                thenApply(topics -> {
+                    TopicListWatcher watcher = new TopicListWatcher(this, 
watcherId, topicsPattern, topics);
+                    topicResources.registerPersistentTopicListener(namespace, 
watcher);
+                    return watcher;
+                }).
+                whenComplete((watcher, exception) -> {
+                    if (exception != null) {
+                        watcherFuture.completeExceptionally(exception);
+                    } else {
+                        watcherFuture.complete(watcher);
+                    }
+                });
+    }
+
+
+    public void handleWatchTopicListClose(CommandWatchTopicListClose 
commandWatchTopicListClose) {
+        long requestId = commandWatchTopicListClose.getRequestId();
+        long watcherId = commandWatchTopicListClose.getWatcherId();
+        deleteTopicListWatcher(watcherId);
+        connection.getCommandSender().sendWatchTopicListSuccess(requestId, 
watcherId, null, null);
+    }
+
+    public void deleteTopicListWatcher(Long watcherId) {
+        CompletableFuture<TopicListWatcher> watcherFuture = 
watchers.get(watcherId);
+        if (watcherFuture == null) {
+            log.info("[{}] TopicListWatcher was not registered on the 
connection: {}",
+                    watcherId, connection.getRemoteAddress());
+            return;
+        }
+
+        if (!watcherFuture.isDone() && watcherFuture
+                .completeExceptionally(new IllegalStateException("Closed 
watcher before creation was complete"))) {
+            // We have received a request to close the watcher before it was 
actually completed, we have marked the
+            // watcher future as failed and we can tell the client the close 
operation was successful. When the actual
+            // create operation will complete, the new watcher will be 
discarded.
+            log.info("[{}] Closed watcher before its creation was completed. 
watcherId={}",
+                    connection.getRemoteAddress(), watcherId);
+            watchers.remove(watcherId);
+            return;
+        }
+
+        if (watcherFuture.isCompletedExceptionally()) {
+            log.info("[{}] Closed watcher that already failed to be created. 
watcherId={}",
+                    connection.getRemoteAddress(), watcherId);
+            watchers.remove(watcherId);
+            return;
+        }
+
+        // Proceed with normal watcher close
+        
topicResources.deregisterPersistentTopicListener(watcherFuture.getNow(null));
+        watchers.remove(watcherId);
+        log.info("[{}] Closed watcher, watcherId={}", 
connection.getRemoteAddress(), watcherId);
+    }
+
+    public void sendTopicListUpdate(long watcherId, String topicsHash, 
List<String> deletedTopics,
+                                    List<String> newTopics) {
+        connection.getCommandSender().sendWatchTopicListUpdate(watcherId, 
newTopics, deletedTopics, topicsHash);
+    }
+
+
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 1efc2576a2e..dd86d6189d1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -102,6 +102,7 @@ import 
org.apache.pulsar.common.api.proto.CommandSendReceipt;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.CommandSuccess;
+import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.ProtocolVersion;
 import org.apache.pulsar.common.api.proto.ServerError;
@@ -216,6 +217,8 @@ public class ServerCnxTest {
         
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkTopicOwnership(any());
         
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfTopics(
                 NamespaceName.get("use", "ns-abc"), 
CommandGetTopicsOfNamespace.Mode.ALL);
+        
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfPersistentTopics(
+                NamespaceName.get("use", "ns-abc"));
 
         setupMLAsyncCallbackMocks();
 
@@ -1960,6 +1963,26 @@ public class ServerCnxTest {
         channel.finish();
     }
 
+    @Test
+    public void testWatchTopicList() throws Exception {
+        svcConfig.setEnableBrokerSideSubscriptionPatternEvaluation(true);
+        resetChannel();
+        setChannelConnected();
+        BaseCommand command = Commands.newWatchTopicList(1, 3, "use/ns-abc", 
"use/ns-abc/topic-.*", null);
+        ByteBuf serializedCommand = Commands.serializeWithSize(command);
+
+        channel.writeInbound(serializedCommand);
+        Object resp = getResponse();
+        System.out.println(resp);
+        CommandWatchTopicListSuccess response = (CommandWatchTopicListSuccess) 
resp;
+
+        assertEquals(response.getTopicsList(), matchingTopics);
+        assertEquals(response.getTopicsHash(), 
TopicList.calculateHash(matchingTopics));
+        assertEquals(response.getWatcherId(), 3);
+
+        channel.finish();
+    }
+
     @Test
     public void testNeverDelayConsumerFutureWhenNotFail() throws Exception{
         // Mock ServerCnx.field: consumers
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java
new file mode 100644
index 00000000000..924019f7a54
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.broker.resources.TopicResources;
+import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose;
+import org.apache.pulsar.common.api.proto.ServerError;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.topics.TopicList;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+import java.util.regex.Pattern;
+
+public class TopicListServiceTest {
+
+    private TopicListService topicListService;
+    private ServerCnx connection;
+    private CompletableFuture<List<String>> topicListFuture;
+    private Semaphore lookupSemaphore;
+    private TopicResources topicResources;
+
+    @BeforeMethod(alwaysRun = true)
+    public void setup() throws Exception {
+        lookupSemaphore = new Semaphore(1);
+        lookupSemaphore.acquire();
+        topicListFuture = new CompletableFuture<>();
+        topicResources = mock(TopicResources.class);
+
+        PulsarService pulsar = mock(PulsarService.class);
+        
when(pulsar.getNamespaceService()).thenReturn(mock(NamespaceService.class));
+        
when(pulsar.getPulsarResources()).thenReturn(mock(PulsarResources.class));
+        
when(pulsar.getPulsarResources().getTopicResources()).thenReturn(topicResources);
+        
when(pulsar.getNamespaceService().getListOfPersistentTopics(any())).thenReturn(topicListFuture);
+
+
+        connection = mock(ServerCnx.class);
+        when(connection.getRemoteAddress()).thenReturn(new 
InetSocketAddress(10000));
+        
when(connection.getCommandSender()).thenReturn(mock(PulsarCommandSender.class));
+
+        topicListService = new TopicListService(pulsar, connection, true, 30);
+
+    }
+
+    @Test
+    public void testCommandWatchSuccessResponse() {
+
+        topicListService.handleWatchTopicList(
+                NamespaceName.get("tenant/ns"),
+                13,
+                7,
+                Pattern.compile("persistent://tenant/ns/topic\\d"),
+                null,
+                lookupSemaphore);
+        List<String> topics = 
Collections.singletonList("persistent://tenant/ns/topic1");
+        String hash = TopicList.calculateHash(topics);
+        topicListFuture.complete(topics);
+        Assert.assertEquals(1, lookupSemaphore.availablePermits());
+        verify(topicResources).registerPersistentTopicListener(
+                eq(NamespaceName.get("tenant/ns")), 
any(TopicListService.TopicListWatcher.class));
+        verify(connection.getCommandSender()).sendWatchTopicListSuccess(7, 13, 
hash, topics);
+    }
+
+    @Test
+    public void testCommandWatchErrorResponse() {
+        topicListService.handleWatchTopicList(
+                NamespaceName.get("tenant/ns"),
+                13,
+                7,
+                Pattern.compile("persistent://tenant/ns/topic\\d"),
+                null,
+                lookupSemaphore);
+        topicListFuture.completeExceptionally(new 
PulsarServerException("Error"));
+        Assert.assertEquals(1, lookupSemaphore.availablePermits());
+        verifyNoInteractions(topicResources);
+        verify(connection.getCommandSender()).sendErrorResponse(eq(7L), 
any(ServerError.class),
+                eq(PulsarServerException.class.getCanonicalName() + ": 
Error"));
+    }
+
+    @Test
+    public void testCommandWatchTopicListCloseRemovesListener() {
+        topicListService.handleWatchTopicList(
+                NamespaceName.get("tenant/ns"),
+                13,
+                7,
+                Pattern.compile("persistent://tenant/ns/topic\\d"),
+                null,
+                lookupSemaphore);
+        List<String> topics = 
Collections.singletonList("persistent://tenant/ns/topic1");
+        topicListFuture.complete(topics);
+
+        CommandWatchTopicListClose watchTopicListClose = new 
CommandWatchTopicListClose()
+                .setRequestId(8)
+                .setWatcherId(13);
+        topicListService.handleWatchTopicListClose(watchTopicListClose);
+        
verify(topicResources).deregisterPersistentTopicListener(any(TopicListService.TopicListWatcher.class));
+    }
+
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java
new file mode 100644
index 00000000000..1a2e192d1f5
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.apache.pulsar.common.topics.TopicList;
+import org.apache.pulsar.metadata.api.NotificationType;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+
+public class TopicListWatcherTest {
+
+    private static final List<String> INITIAL_TOPIC_LIST = Arrays.asList(
+            "persistent://tenant/ns/topic1",
+            "persistent://tenant/ns/topic2",
+            "persistent://tenant/ns/t3"
+    );
+
+    private static final long ID = 7;
+    private static final Pattern PATTERN = 
Pattern.compile("persistent://tenant/ns/topic\\d+");
+
+
+    private TopicListService topicListService;
+    private TopicListService.TopicListWatcher watcher;
+
+
+
+    @BeforeMethod(alwaysRun = true)
+    public void setup() {
+        topicListService = mock(TopicListService.class);
+        watcher = new TopicListService.TopicListWatcher(topicListService, ID, 
PATTERN, INITIAL_TOPIC_LIST);
+    }
+
+    @Test
+    public void testGetMatchingTopicsReturnsFilteredList() {
+        Assert.assertEquals(
+                Arrays.asList("persistent://tenant/ns/topic1", 
"persistent://tenant/ns/topic2"),
+                watcher.getMatchingTopics());
+    }
+
+    @Test
+    public void testAcceptSendsNotificationAndRemembersTopic() {
+        String newTopic = "persistent://tenant/ns/topic3";
+        watcher.accept(newTopic, NotificationType.Created);
+
+        List<String> allMatchingTopics = Arrays.asList(
+                "persistent://tenant/ns/topic1", 
"persistent://tenant/ns/topic2", newTopic);
+        String hash = TopicList.calculateHash(allMatchingTopics);
+        verify(topicListService).sendTopicListUpdate(ID, hash, 
Collections.emptyList(),
+                Collections.singletonList(newTopic));
+        Assert.assertEquals(
+                allMatchingTopics,
+                watcher.getMatchingTopics());
+    }
+
+    @Test
+    public void testAcceptSendsNotificationAndForgetsTopic() {
+        String deletedTopic = "persistent://tenant/ns/topic1";
+        watcher.accept(deletedTopic, NotificationType.Deleted);
+
+        List<String> allMatchingTopics = 
Collections.singletonList("persistent://tenant/ns/topic2");
+        String hash = TopicList.calculateHash(allMatchingTopics);
+        verify(topicListService).sendTopicListUpdate(ID, hash,
+                Collections.singletonList(deletedTopic), 
Collections.emptyList());
+        Assert.assertEquals(
+                allMatchingTopics,
+                watcher.getMatchingTopics());
+    }
+
+    @Test
+    public void testAcceptIgnoresNonMatching() {
+        watcher.accept("persistent://tenant/ns/mytopic", 
NotificationType.Created);
+        verifyNoInteractions(topicListService);
+        Assert.assertEquals(
+                Arrays.asList("persistent://tenant/ns/topic1", 
"persistent://tenant/ns/topic2"),
+                watcher.getMatchingTopics());
+    }
+
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
index 92f2617cc68..a1782278966 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.utils;
 import java.util.Queue;
 
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
+import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
 import org.apache.pulsar.common.protocol.PulsarDecoder;
 import org.apache.pulsar.common.api.proto.CommandAck;
 import org.apache.pulsar.common.api.proto.CommandCloseConsumer;
@@ -156,6 +157,11 @@ public class ClientChannelHelper {
         protected void 
handleGetTopicsOfNamespaceSuccess(CommandGetTopicsOfNamespaceResponse response) 
{
             queue.offer(new 
CommandGetTopicsOfNamespaceResponse().copyFrom(response));
         }
+
+        @Override
+        protected void 
handleCommandWatchTopicListSuccess(CommandWatchTopicListSuccess 
commandWatchTopicListSuccess) {
+            queue.offer(new 
CommandWatchTopicListSuccess().copyFrom(commandWatchTopicListSuccess));
+        }
     };
 
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
index 4353297666a..8c8e4b96add 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
@@ -719,7 +719,7 @@ public class ClientErrorsTest {
         AtomicBoolean msgSent = new AtomicBoolean();
         mockBrokerService.setHandleConnect((ctx, connect) -> {
             channelCtx.set(ctx);
-            
ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion()));
+            
ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion(), false));
             if (numOfConnections.incrementAndGet() == 2) {
                 // close the cnx immediately when trying to connect the 2nd 
time
                 ctx.channel().close();
@@ -759,7 +759,7 @@ public class ClientErrorsTest {
         CountDownLatch latch = new CountDownLatch(1);
         mockBrokerService.setHandleConnect((ctx, connect) -> {
             channelCtx.set(ctx);
-            
ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion()));
+            
ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion(), false));
             if (numOfConnections.incrementAndGet() == 2) {
                 // close the cnx immediately when trying to connect the 2nd 
time
                 ctx.channel().close();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
index b16fac427aa..7cba7405ddb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
@@ -143,7 +143,7 @@ public class MockBrokerService {
                 return;
             }
             // default
-            
ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion()));
+            
ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion(), false));
         }
 
         @Override
diff --git a/pulsar-client-cpp/lib/Commands.cc 
b/pulsar-client-cpp/lib/Commands.cc
index 33134a6c04b..3a00d302e86 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -669,6 +669,18 @@ std::string Commands::messageType(BaseCommand_Type type) {
         case BaseCommand::TC_CLIENT_CONNECT_RESPONSE:
             return "TC_CLIENT_CONNECT_RESPONSE";
             break;
+        case BaseCommand::WATCH_TOPIC_LIST:
+            return "WATCH_TOPIC_LIST";
+            break;
+        case BaseCommand::WATCH_TOPIC_LIST_SUCCESS:
+            return "WATCH_TOPIC_LIST_SUCCESS";
+            break;
+        case BaseCommand::WATCH_TOPIC_UPDATE:
+            return "WATCH_TOPIC_UPDATE";
+            break;
+        case BaseCommand::WATCH_TOPIC_LIST_CLOSE:
+            return "WATCH_TOPIC_LIST_CLOSE";
+            break;
     };
     BOOST_THROW_EXCEPTION(std::logic_error("Invalid BaseCommand enumeration 
value"));
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 2106b7a5c6f..18bffba8dc6 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -256,6 +256,11 @@ public class BinaryProtoLookupService implements 
LookupService {
         return serviceNameResolver.getServiceUrl();
     }
 
+    @Override
+    public InetSocketAddress resolveHost() {
+        return serviceNameResolver.resolveHost();
+    }
+
     @Override
     public CompletableFuture<GetTopicsResult> 
getTopicsUnderNamespace(NamespaceName namespace,
                                                                                
   Mode mode,
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index d52955ccf0b..37464dacfd4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -44,6 +44,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import lombok.AccessLevel;
 import lombok.Getter;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.commons.lang3.tuple.Pair;
@@ -59,6 +60,7 @@ import org.apache.pulsar.client.impl.schema.SchemaInfoUtil;
 import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
 import org.apache.pulsar.client.util.TimedCompletableFuture;
 import org.apache.pulsar.common.api.AuthData;
+import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.CommandAckResponse;
 import org.apache.pulsar.common.api.proto.CommandActiveConsumerChange;
 import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
@@ -85,6 +87,8 @@ import org.apache.pulsar.common.api.proto.CommandSendError;
 import org.apache.pulsar.common.api.proto.CommandSendReceipt;
 import org.apache.pulsar.common.api.proto.CommandSuccess;
 import org.apache.pulsar.common.api.proto.CommandTcClientConnectResponse;
+import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
+import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.lookup.GetTopicsResult;
 import org.apache.pulsar.common.protocol.Commands;
@@ -128,6 +132,12 @@ public class ClientCnx extends PulsarHandler {
                     .expectedItems(16)
                     .concurrencyLevel(1)
                     .build();
+    @Getter(AccessLevel.PACKAGE)
+    private final ConcurrentLongHashMap<TopicListWatcher> topicListWatchers =
+            ConcurrentLongHashMap.<TopicListWatcher>newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(1)
+                    .build();
 
     private final CompletableFuture<Void> connectionFuture = new 
CompletableFuture<Void>();
     private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new 
ConcurrentLinkedQueue<>();
@@ -160,6 +170,7 @@ public class ClientCnx extends PulsarHandler {
     @Getter
     protected AuthenticationDataProvider authenticationDataProvider;
     private TransactionBufferHandler transactionBufferHandler;
+    private boolean supportsTopicWatchers;
 
     enum State {
         None, SentConnectFrame, Ready, Failed, Connecting
@@ -281,12 +292,14 @@ public class ClientCnx extends PulsarHandler {
         producers.forEach((id, producer) -> producer.connectionClosed(this));
         consumers.forEach((id, consumer) -> consumer.connectionClosed(this));
         transactionMetaStoreHandlers.forEach((id, handler) -> 
handler.connectionClosed(this));
+        topicListWatchers.forEach((__, watcher) -> 
watcher.connectionClosed(this));
 
         pendingRequests.clear();
         waitingLookupRequests.clear();
 
         producers.clear();
         consumers.clear();
+        topicListWatchers.clear();
 
         timeoutTask.cancel(true);
     }
@@ -330,6 +343,10 @@ public class ClientCnx extends PulsarHandler {
         if (log.isDebugEnabled()) {
             log.debug("{} Connection is ready", ctx.channel());
         }
+
+        supportsTopicWatchers =
+            connected.hasFeatureFlags() && 
connected.getFeatureFlags().isSupportsTopicWatchers();
+
         // set remote protocol version to the correct version before we 
complete the connection future
         setRemoteEndpointProtocolVersion(connected.getProtocolVersion());
         connectionFuture.complete(null);
@@ -1036,6 +1053,51 @@ public class ClientCnx extends PulsarHandler {
         return transactionBufferHandler;
     }
 
+    public CompletableFuture<CommandWatchTopicListSuccess> newWatchTopicList(
+            BaseCommand commandWatchTopicList, long requestId) {
+        if (!supportsTopicWatchers) {
+            return FutureUtil.failedFuture(
+                    new PulsarClientException.NotAllowedException(
+                            "Broker does not allow broker side pattern 
evaluation."));
+        }
+        return 
sendRequestAndHandleTimeout(Commands.serializeWithSize(commandWatchTopicList), 
requestId,
+                RequestType.Command, true);
+    }
+
+    protected void 
handleCommandWatchTopicListSuccess(CommandWatchTopicListSuccess 
commandWatchTopicListSuccess) {
+        checkArgument(state == State.Ready);
+
+        if (log.isDebugEnabled()) {
+            log.debug("{} Received watchTopicListSuccess response from server: 
{}",
+                    ctx.channel(), 
commandWatchTopicListSuccess.getRequestId());
+        }
+        long requestId = commandWatchTopicListSuccess.getRequestId();
+        CompletableFuture<CommandWatchTopicListSuccess> requestFuture =
+                (CompletableFuture<CommandWatchTopicListSuccess>) 
pendingRequests.remove(requestId);
+        if (requestFuture != null) {
+            requestFuture.complete(commandWatchTopicListSuccess);
+        } else {
+            log.warn("{} Received unknown request id from server: {}",
+                    ctx.channel(), 
commandWatchTopicListSuccess.getRequestId());
+        }
+    }
+
+    protected void handleCommandWatchTopicUpdate(CommandWatchTopicUpdate 
commandWatchTopicUpdate) {
+        checkArgument(state == State.Ready);
+
+        if (log.isDebugEnabled()) {
+            log.debug("{} Received watchTopicUpdate command from server: {}",
+                    ctx.channel(), commandWatchTopicUpdate.getWatcherId());
+        }
+        long watcherId = commandWatchTopicUpdate.getWatcherId();
+        TopicListWatcher watcher = topicListWatchers.get(watcherId);
+        if (watcher != null) {
+            watcher.handleCommandWatchTopicUpdate(commandWatchTopicUpdate);
+        } else {
+            log.warn("{} Received topic list update for unknown watcher from 
server: {}", ctx.channel(), watcherId);
+        }
+    }
+
     /**
      * check serverError and take appropriate action.
      * <ul>
@@ -1082,6 +1144,11 @@ public class ClientCnx extends PulsarHandler {
         transactionMetaStoreHandlers.put(transactionMetaStoreId, handler);
     }
 
+    void registerTopicListWatcher(final long watcherId, final TopicListWatcher 
watcher) {
+        topicListWatchers.put(watcherId, watcher);
+
+    }
+
     public void registerTransactionBufferHandler(final 
TransactionBufferHandler handler) {
         transactionBufferHandler = handler;
     }
@@ -1094,6 +1161,10 @@ public class ClientCnx extends PulsarHandler {
         consumers.remove(consumerId);
     }
 
+    void removeTopicListWatcher(final long watcherId) {
+        topicListWatchers.remove(watcherId);
+    }
+
     void setTargetBroker(InetSocketAddress targetBrokerAddress) {
         this.proxyToTargetBrokerAddress = String.format("%s:%d", 
targetBrokerAddress.getHostString(),
                 targetBrokerAddress.getPort());
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index a951d7b2cb8..6c5a5be200f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -69,8 +70,13 @@ public class ConnectionHandler {
         }
 
         try {
-            state.client.getConnection(state.topic) //
-                    .thenAccept(cnx -> connection.connectionOpened(cnx)) //
+            CompletableFuture<ClientCnx> cnxFuture;
+            if (state.topic == null) {
+                cnxFuture = state.client.getConnectionToServiceUrl();
+            } else {
+                cnxFuture = state.client.getConnection(state.topic); //
+            }
+            cnxFuture.thenAccept(cnx -> connection.connectionOpened(cnx)) //
                     .exceptionally(this::handleConnectionError);
         } catch (Throwable t) {
             log.warn("[{}] [{}] Exception thrown while getting connection: ", 
state.topic, state.getHandlerName(), t);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index 82530661be0..61d0c2bd64e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -155,6 +155,10 @@ public class HttpClient implements Closeable {
         return this.serviceNameResolver.getServiceUrl();
     }
 
+    public InetSocketAddress resolveHost() {
+        return serviceNameResolver.resolveHost();
+    }
+
     void setServiceUrl(String serviceUrl) throws PulsarClientException {
         this.serviceNameResolver.updateServiceUrl(serviceUrl);
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 6d04c5fab31..def19c45aff 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -123,6 +123,11 @@ public class HttpLookupService implements LookupService {
         return httpClient.getServiceUrl();
     }
 
+    @Override
+    public InetSocketAddress resolveHost() {
+        return httpClient.resolveHost();
+    }
+
     @Override
     public CompletableFuture<GetTopicsResult> 
getTopicsUnderNamespace(NamespaceName namespace, Mode mode,
                                                                       String 
topicsPattern, String topicsHash) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
index e7d358148c0..c1c93f36c6e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
@@ -90,6 +90,13 @@ public interface LookupService extends AutoCloseable {
      */
     String getServiceUrl();
 
+    /**
+     * Resolves pulsar service url.
+     *
+     * @return the service url resolved to a socket address
+     */
+    InetSocketAddress resolveHost();
+
     /**
      * Returns all the topics name for a given namespace.
      *
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index 9d00b2c2827..79a7c6a3ae6 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
 import static com.google.common.base.Preconditions.checkArgument;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
 import java.util.ArrayList;
@@ -47,6 +48,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
     private final Pattern topicsPattern;
     private final TopicsChangedListener topicsChangeListener;
     private final Mode subscriptionMode;
+    private final CompletableFuture<TopicListWatcher> watcherFuture;
     protected NamespaceName namespaceName;
     private volatile Timeout recheckPatternTimeout = null;
     private volatile String topicsHash;
@@ -74,6 +76,19 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
         this.topicsChangeListener = new PatternTopicsChangedListener();
         this.recheckPatternTimeout = client.timer()
                 .newTimeout(this, Math.max(1, 
conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
+        this.watcherFuture = new CompletableFuture<>();
+        if (subscriptionMode == Mode.PERSISTENT) {
+            long watcherId = client.newTopicListWatcherId();
+            new TopicListWatcher(topicsChangeListener, client, topicsPattern, 
watcherId,
+                namespaceName, topicsHash, watcherFuture);
+            watcherFuture.exceptionally(ex -> {
+                log.debug("Unable to create topic list watcher. Falling back 
to only polling for new topics", ex);
+                return null;
+            });
+        } else {
+            log.debug("Not creating topic list watcher for subscription mode 
{}", subscriptionMode);
+            watcherFuture.complete(null);
+        }
     }
 
     public static NamespaceName getNameSpaceFromPattern(Pattern pattern) {
@@ -172,7 +187,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
             FutureUtil.waitForAll(futures)
                 .thenAccept(finalFuture -> removeFuture.complete(null))
                 .exceptionally(ex -> {
-                    log.warn("[{}] Failed to subscribe topics: {}", topic, 
ex.getMessage());
+                    log.warn("[{}] Failed to unsubscribe from topics: {}", 
topic, ex.getMessage());
                     removeFuture.completeExceptionally(ex);
                 return null;
             });
@@ -194,7 +209,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
             FutureUtil.waitForAll(futures)
                 .thenAccept(finalFuture -> addFuture.complete(null))
                 .exceptionally(ex -> {
-                    log.warn("[{}] Failed to unsubscribe topics: {}", topic, 
ex.getMessage());
+                    log.warn("[{}] Failed to subscribe to topics: {}", topic, 
ex.getMessage());
                     addFuture.completeExceptionally(ex);
                     return null;
                 });
@@ -203,13 +218,23 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
     }
 
     @Override
+    @SuppressFBWarnings
     public CompletableFuture<Void> closeAsync() {
         Timeout timeout = recheckPatternTimeout;
         if (timeout != null) {
             timeout.cancel();
             recheckPatternTimeout = null;
         }
-        return super.closeAsync();
+        List<CompletableFuture<?>> closeFutures = new ArrayList<>(2);
+        if (watcherFuture.isDone() && 
!watcherFuture.isCompletedExceptionally()) {
+            TopicListWatcher watcher = watcherFuture.getNow(null);
+            // watcher can be null when subscription mode is not persistent
+            if (watcher != null) {
+                closeFutures.add(watcher.closeAsync());
+            }
+        }
+        closeFutures.add(super.closeAsync());
+        return FutureUtil.waitForAll(closeFutures);
     }
 
     @VisibleForTesting
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 2460f4c53f5..2c6339cdb20 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -125,6 +125,7 @@ public class PulsarClientImpl implements PulsarClient {
 
     private final AtomicLong producerIdGenerator = new AtomicLong();
     private final AtomicLong consumerIdGenerator = new AtomicLong();
+    private final AtomicLong topicListWatcherIdGenerator = new AtomicLong();
     private final AtomicLong requestIdGenerator =
             new AtomicLong(ThreadLocalRandom.current().nextLong(0, 
Long.MAX_VALUE / 2));
 
@@ -934,6 +935,11 @@ public class PulsarClientImpl implements PulsarClient {
                 .thenCompose(pair -> getConnection(pair.getLeft(), 
pair.getRight()));
     }
 
+    public CompletableFuture<ClientCnx> getConnectionToServiceUrl() {
+        InetSocketAddress address = lookup.resolveHost();
+        return getConnection(address, address);
+    }
+
     public CompletableFuture<ClientCnx> getConnection(final InetSocketAddress 
logicalAddress,
                                                       final InetSocketAddress 
physicalAddress) {
         return cnxPool.getConnection(logicalAddress, physicalAddress);
@@ -956,6 +962,10 @@ public class PulsarClientImpl implements PulsarClient {
         return consumerIdGenerator.getAndIncrement();
     }
 
+    long newTopicListWatcherId() {
+        return topicListWatcherIdGenerator.getAndIncrement();
+    }
+
     public long newRequestId() {
         return requestIdGenerator.getAndIncrement();
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
new file mode 100644
index 00000000000..9cd6b003d7d
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import io.netty.channel.ChannelHandlerContext;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.api.proto.BaseCommand;
+import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.protocol.Commands;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicListWatcher extends HandlerState implements 
ConnectionHandler.Connection {
+
+    private static final Logger log = 
LoggerFactory.getLogger(TopicListWatcher.class);
+
+    private static final AtomicLongFieldUpdater<TopicListWatcher> 
CREATE_WATCHER_DEADLINE_UPDATER =
+            AtomicLongFieldUpdater
+                    .newUpdater(TopicListWatcher.class, 
"createWatcherDeadline");
+
+    private final PatternMultiTopicsConsumerImpl.TopicsChangedListener 
topicsChangeListener;
+    private final String name;
+    private final ConnectionHandler connectionHandler;
+    private final Pattern topicsPattern;
+    private final long watcherId;
+    private volatile long createWatcherDeadline = 0;
+    private final NamespaceName namespace;
+    // TODO maintain the value based on updates from broker and warn the user 
if inconsistent with hash from polling
+    private String topicsHash;
+    private final CompletableFuture<TopicListWatcher> watcherFuture;
+
+    private final List<Throwable> previousExceptions = new 
CopyOnWriteArrayList<>();
+    private final AtomicReference<ClientCnx> 
clientCnxUsedForWatcherRegistration = new AtomicReference<>();
+
+
+    public 
TopicListWatcher(PatternMultiTopicsConsumerImpl.TopicsChangedListener 
topicsChangeListener,
+                            PulsarClientImpl client, Pattern topicsPattern, 
long watcherId,
+                            NamespaceName namespace, String topicsHash,
+                            CompletableFuture<TopicListWatcher> watcherFuture) 
{
+        super(client, null);
+        this.topicsChangeListener = topicsChangeListener;
+        this.name = "Watcher(" + topicsPattern + ")";
+        this.connectionHandler = new ConnectionHandler(this,
+                new BackoffBuilder()
+                        
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
+                                TimeUnit.NANOSECONDS)
+                        
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), 
TimeUnit.NANOSECONDS)
+                        .setMandatoryStop(0, TimeUnit.MILLISECONDS)
+                        .create(),
+                this);
+        this.topicsPattern = topicsPattern;
+        this.watcherId = watcherId;
+        this.namespace = namespace;
+        this.topicsHash = topicsHash;
+        this.watcherFuture = watcherFuture;
+
+        connectionHandler.grabCnx();
+    }
+
+    @Override
+    public void connectionFailed(PulsarClientException exception) {
+        boolean nonRetriableError = 
!PulsarClientException.isRetriableError(exception);
+        if (nonRetriableError) {
+            exception.setPreviousExceptions(previousExceptions);
+            if (watcherFuture.completeExceptionally(exception)) {
+                setState(State.Failed);
+                log.info("[{}] Watcher creation failed for {} with 
non-retriable error {}",
+                        topic, name, exception);
+                deregisterFromClientCnx();
+            }
+        } else {
+            previousExceptions.add(exception);
+        }
+    }
+
+    @Override
+    public void connectionOpened(ClientCnx cnx) {
+        previousExceptions.clear();
+
+        if (getState() == State.Closing || getState() == State.Closed) {
+            setState(State.Closed);
+            deregisterFromClientCnx();
+            return;
+        }
+
+        log.info("[{}][{}] Creating topic list watcher on cnx {}, watcherId 
{}",
+                topic, getHandlerName(), cnx.ctx().channel(), watcherId);
+
+        long requestId = client.newRequestId();
+
+        CREATE_WATCHER_DEADLINE_UPDATER
+                .compareAndSet(this, 0L, System.currentTimeMillis()
+                        + client.getConfiguration().getOperationTimeoutMs());
+
+        // synchronized this, because redeliverUnAckMessage eliminate the 
epoch inconsistency between them
+        synchronized (this) {
+            setClientCnx(cnx);
+            BaseCommand watchRequest = Commands.newWatchTopicList(requestId, 
watcherId, namespace.toString(),
+                            topicsPattern.pattern(), topicsHash);
+
+            cnx.newWatchTopicList(watchRequest, requestId)
+
+                    .thenAccept(response -> {
+                        synchronized (TopicListWatcher.this) {
+                            if (!changeToReadyState()) {
+                                // Watcher was closed while reconnecting, 
close the connection to make sure the broker
+                                // drops the watcher on its side
+                                setState(State.Closed);
+                                deregisterFromClientCnx();
+                                cnx.channel().close();
+                                return;
+                            }
+                        }
+
+                        this.connectionHandler.resetBackoff();
+
+                        watcherFuture.complete(this);
+
+                    }).exceptionally((e) -> {
+                        deregisterFromClientCnx();
+                        if (getState() == State.Closing || getState() == 
State.Closed) {
+                            // Watcher was closed while reconnecting, close 
the connection to make sure the broker
+                            // drops the watcher on its side
+                            cnx.channel().close();
+                            return null;
+                        }
+                        log.warn("[{}][{}] Failed to subscribe to topic on 
{}", topic,
+                                getHandlerName(), 
cnx.channel().remoteAddress());
+
+                        if (e.getCause() instanceof PulsarClientException
+                                && 
PulsarClientException.isRetriableError(e.getCause())
+                                && System.currentTimeMillis()
+                                    < 
CREATE_WATCHER_DEADLINE_UPDATER.get(TopicListWatcher.this)) {
+                            reconnectLater(e.getCause());
+                        } else if (!watcherFuture.isDone()) {
+                            // unable to create new watcher, fail operation
+                            setState(State.Failed);
+                            watcherFuture.completeExceptionally(
+                                    PulsarClientException.wrap(e, 
String.format("Failed to create topic list watcher %s"
+                                                    + "when connecting to the 
broker", getHandlerName())));
+                        } else {
+                            // watcher was subscribed and connected, but we 
got some error, keep trying
+                            reconnectLater(e.getCause());
+                        }
+                        return null;
+                    });
+        }
+    }
+
+    @Override
+    String getHandlerName() {
+        return name;
+    }
+
+    public boolean isConnected() {
+        return getClientCnx() != null && (getState() == State.Ready);
+    }
+
+    public ClientCnx getClientCnx() {
+        return this.connectionHandler.cnx();
+    }
+
+    public CompletableFuture<Void> closeAsync() {
+
+        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+
+        if (getState() == State.Closing || getState() == State.Closed) {
+            closeFuture.complete(null);
+            return closeFuture;
+        }
+
+        if (!isConnected()) {
+            log.info("[{}] [{}] Closed watcher (not connected)", topic, 
getHandlerName());
+            setState(State.Closed);
+            deregisterFromClientCnx();
+            closeFuture.complete(null);
+            return closeFuture;
+        }
+
+        setState(State.Closing);
+
+
+        long requestId = client.newRequestId();
+
+        ClientCnx cnx = cnx();
+        if (null == cnx) {
+            cleanupAtClose(closeFuture, null);
+        } else {
+            BaseCommand cmd = Commands.newWatchTopicListClose(watcherId, 
requestId);
+            cnx.sendRequestWithId(Commands.serializeWithSize(cmd), 
requestId).handle((v, exception) -> {
+                final ChannelHandlerContext ctx = cnx.ctx();
+                boolean ignoreException = ctx == null || 
!ctx.channel().isActive();
+                if (ignoreException && exception != null) {
+                    log.debug("Exception ignored in closing watcher", 
exception);
+                }
+                cleanupAtClose(closeFuture, ignoreException ? null : 
exception);
+                return null;
+            });
+        }
+
+        return closeFuture;
+    }
+
+    // wrapper for connection methods
+    ClientCnx cnx() {
+        return this.connectionHandler.cnx();
+    }
+
+    public void connectionClosed(ClientCnx clientCnx) {
+        this.connectionHandler.connectionClosed(clientCnx);
+    }
+
+    void setClientCnx(ClientCnx clientCnx) {
+        if (clientCnx != null) {
+            this.connectionHandler.setClientCnx(clientCnx);
+            clientCnx.registerTopicListWatcher(watcherId, this);
+        }
+        ClientCnx previousClientCnx = 
clientCnxUsedForWatcherRegistration.getAndSet(clientCnx);
+        if (previousClientCnx != null && previousClientCnx != clientCnx) {
+            previousClientCnx.removeTopicListWatcher(watcherId);
+        }
+    }
+
+    void deregisterFromClientCnx() {
+        setClientCnx(null);
+    }
+
+    void reconnectLater(Throwable exception) {
+        this.connectionHandler.reconnectLater(exception);
+    }
+
+
+    private void cleanupAtClose(CompletableFuture<Void> closeFuture, Throwable 
exception) {
+        log.info("[{}] Closed topic list watcher", getHandlerName());
+        setState(State.Closed);
+        deregisterFromClientCnx();
+        if (exception != null) {
+            closeFuture.completeExceptionally(exception);
+        } else {
+            closeFuture.complete(null);
+        }
+    }
+
+    public void handleCommandWatchTopicUpdate(CommandWatchTopicUpdate update) {
+        List<String> deleted = update.getDeletedTopicsList();
+        if (!deleted.isEmpty()) {
+            topicsChangeListener.onTopicsRemoved(deleted);
+        }
+        List<String> added = update.getNewTopicsList();
+        if (!added.isEmpty()) {
+            topicsChangeListener.onTopicsAdded(added);
+        }
+    }
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index 6ce4afecd02..c6eba43fb7a 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -20,8 +20,10 @@ package org.apache.pulsar.client.impl;
 
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import io.netty.buffer.ByteBuf;
@@ -34,12 +36,16 @@ import java.lang.reflect.Field;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadFactory;
+import java.util.function.Consumer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import 
org.apache.pulsar.client.api.PulsarClientException.BrokerMetadataException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.proto.CommandCloseConsumer;
 import org.apache.pulsar.common.api.proto.CommandCloseProducer;
+import org.apache.pulsar.common.api.proto.CommandConnected;
 import org.apache.pulsar.common.api.proto.CommandError;
+import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
+import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.PulsarHandler;
@@ -190,4 +196,107 @@ public class ClientCnxTest {
 
         eventLoop.shutdownGracefully();
     }
+
+    @Test
+    public void testNoWatchersWhenNoServerSupport() {
+        withConnection("testNoWatchersWhenNoServerSupport", cnx -> {
+            cnx.handleConnected(new CommandConnected()
+                    .setServerVersion("Some old Server")
+                    .setProtocolVersion(1));
+
+            CompletableFuture<CommandWatchTopicListSuccess> result =
+                    cnx.newWatchTopicList(Commands.newWatchTopicList(7, 5, 
"tenant/ns",
+                            ".*", null), 7);
+            assertTrue(result.isCompletedExceptionally());
+            assertFalse(cnx.getTopicListWatchers().containsKey(5));
+        });
+    }
+
+    @Test
+    public void testCreateWatcher() {
+        withConnection("testCreateWatcher", cnx -> {
+            CommandConnected connected = new CommandConnected()
+                    .setServerVersion("Some strange Server")
+                    .setProtocolVersion(1);
+            connected.setFeatureFlags().setSupportsTopicWatchers(true);
+            cnx.handleConnected(connected);
+
+            CompletableFuture<CommandWatchTopicListSuccess> result =
+                    cnx.newWatchTopicList(Commands.newWatchTopicList(7, 5, 
"tenant/ns",
+                            ".*", null), 7);
+            verify(cnx.ctx()).writeAndFlush(any(ByteBuf.class));
+            assertFalse(result.isDone());
+
+            CommandWatchTopicListSuccess success = new 
CommandWatchTopicListSuccess()
+                    .setRequestId(7)
+                    .setWatcherId(5).setTopicsHash("f00");
+            cnx.handleCommandWatchTopicListSuccess(success);
+            assertEquals(result.getNow(null), success);
+        });
+    }
+
+
+
+    @Test
+    public void testUpdateWatcher() {
+        withConnection("testUpdateWatcher", cnx -> {
+            CommandConnected connected = new CommandConnected()
+                    .setServerVersion("Some Strange Server")
+                    .setProtocolVersion(1);
+            connected.setFeatureFlags().setSupportsTopicWatchers(true);
+            cnx.handleConnected(connected);
+
+            cnx.newWatchTopicList(Commands.newWatchTopicList(7, 5, 
"tenant/ns", ".*", null), 7);
+
+            CommandWatchTopicListSuccess success = new 
CommandWatchTopicListSuccess()
+                    .setRequestId(7)
+                    .setWatcherId(5).setTopicsHash("f00");
+            cnx.handleCommandWatchTopicListSuccess(success);
+
+            TopicListWatcher watcher = mock(TopicListWatcher.class);
+            cnx.registerTopicListWatcher(5, watcher);
+
+            CommandWatchTopicUpdate update = new CommandWatchTopicUpdate()
+                    .setWatcherId(5)
+                    .setTopicsHash("ADD");
+            update.addNewTopic("persistent://tenant/ns/topic");
+            cnx.handleCommandWatchTopicUpdate(update);
+            verify(watcher).handleCommandWatchTopicUpdate(update);
+        });
+    }
+
+    private void withConnection(String testName, Consumer<ClientCnx> test) {
+        ThreadFactory threadFactory = new DefaultThreadFactory(testName);
+        EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, 
threadFactory);
+        try {
+
+            ClientConfigurationData conf = new ClientConfigurationData();
+            ClientCnx cnx = new ClientCnx(conf, eventLoop);
+
+            ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+            Channel channel = mock(Channel.class);
+            when(ctx.channel()).thenReturn(channel);
+
+            ChannelFuture listenerFuture = mock(ChannelFuture.class);
+            when(listenerFuture.addListener(any())).thenReturn(listenerFuture);
+            when(ctx.writeAndFlush(any())).thenReturn(listenerFuture);
+
+            Field ctxField = PulsarHandler.class.getDeclaredField("ctx");
+            ctxField.setAccessible(true);
+            ctxField.set(cnx, ctx);
+
+            // set connection as SentConnectFrame
+            Field cnxField = ClientCnx.class.getDeclaredField("state");
+            cnxField.setAccessible(true);
+            cnxField.set(cnx, ClientCnx.State.SentConnectFrame);
+
+            test.accept(cnx);
+
+        } catch (NoSuchFieldException | IllegalAccessException e) {
+            fail("Error using reflection on ClientCnx", e);
+        } finally {
+            eventLoop.shutdownGracefully();
+        }
+    }
+
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
index 4c174ff0ac3..4d9977f5478 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
@@ -119,7 +119,7 @@ public class PulsarClientImplTest {
                         new GetTopicsResult(Collections.emptyList(), null, 
false, true)));
         when(lookup.getPartitionedTopicMetadata(any(TopicName.class)))
                 .thenReturn(CompletableFuture.completedFuture(new 
PartitionedTopicMetadata()));
-        when(lookup.getBroker(any(TopicName.class)))
+        when(lookup.getBroker(any()))
                 .thenReturn(CompletableFuture.completedFuture(
                         Pair.of(mock(InetSocketAddress.class), 
mock(InetSocketAddress.class))));
         ConnectionPool pool = mock(ConnectionPool.class);
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
new file mode 100644
index 00000000000..1d245350c82
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import io.netty.channel.ChannelHandlerContext;
+import 
org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl.TopicsChangedListener;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.api.proto.BaseCommand;
+import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
+import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate;
+import org.apache.pulsar.common.naming.NamespaceName;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertTrue;
+import org.mockito.ArgumentCaptor;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.regex.Pattern;
+
+public class TopicListWatcherTest {
+
+    private CompletableFuture<ClientCnx> clientCnxFuture;
+    private TopicListWatcher watcher;
+    private PulsarClientImpl client;
+    private CompletableFuture<TopicListWatcher> watcherFuture;
+    private TopicsChangedListener listener;
+
+    @BeforeMethod(alwaysRun = true)
+    public void setup() {
+        listener = mock(TopicsChangedListener.class);
+        client = mock(PulsarClientImpl.class);
+        when(client.getConfiguration()).thenReturn(new 
ClientConfigurationData());
+        clientCnxFuture = new CompletableFuture<>();
+        when(client.getConnectionToServiceUrl()).thenReturn(clientCnxFuture);
+        watcherFuture = new CompletableFuture<>();
+        watcher = new TopicListWatcher(listener, client,
+                Pattern.compile("persistent://tenant/ns/topic\\d+"), 7,
+                NamespaceName.get("tenant/ns"), null, watcherFuture);
+    }
+
+    @Test
+    public void testWatcherGrabsConnection() {
+        verify(client).getConnectionToServiceUrl();
+    }
+
+    @Test
+    public void testWatcherCreatesBrokerSideObjectWhenConnected() {
+        ClientCnx clientCnx = mock(ClientCnx.class);
+        CompletableFuture<CommandWatchTopicListSuccess> responseFuture = new 
CompletableFuture<>();
+        ArgumentCaptor<BaseCommand> commandCaptor = 
ArgumentCaptor.forClass(BaseCommand.class);
+        when(clientCnx.newWatchTopicList(any(BaseCommand.class), 
anyLong())).thenReturn(responseFuture);
+        when(clientCnx.ctx()).thenReturn(mock(ChannelHandlerContext.class));
+        clientCnxFuture.complete(clientCnx);
+        verify(clientCnx).newWatchTopicList(commandCaptor.capture(), 
anyLong());
+        CommandWatchTopicListSuccess success = new 
CommandWatchTopicListSuccess()
+                .setWatcherId(7)
+                
.setRequestId(commandCaptor.getValue().getWatchTopicList().getRequestId())
+                .setTopicsHash("FEED");
+        success.addTopic("persistent://tenant/ns/topic11");
+        responseFuture.complete(success);
+        assertTrue(watcherFuture.isDone() && 
!watcherFuture.isCompletedExceptionally());
+    }
+
+    @Test
+    public void testWatcherCallsListenerOnUpdate() {
+        ClientCnx clientCnx = mock(ClientCnx.class);
+        CompletableFuture<CommandWatchTopicListSuccess> responseFuture = new 
CompletableFuture<>();
+        ArgumentCaptor<BaseCommand> commandCaptor = 
ArgumentCaptor.forClass(BaseCommand.class);
+        when(clientCnx.newWatchTopicList(any(BaseCommand.class), 
anyLong())).thenReturn(responseFuture);
+        when(clientCnx.ctx()).thenReturn(mock(ChannelHandlerContext.class));
+        clientCnxFuture.complete(clientCnx);
+        verify(clientCnx).newWatchTopicList(commandCaptor.capture(), 
anyLong());
+        CommandWatchTopicListSuccess success = new 
CommandWatchTopicListSuccess()
+                .setWatcherId(7)
+                
.setRequestId(commandCaptor.getValue().getWatchTopicList().getRequestId())
+                .setTopicsHash("FEED");
+        success.addTopic("persistent://tenant/ns/topic11");
+        responseFuture.complete(success);
+
+        CommandWatchTopicUpdate update = new CommandWatchTopicUpdate()
+                .setTopicsHash("F33D")
+                .setWatcherId(7)
+                
.addAllNewTopics(Collections.singleton("persistent://tenant/ns/topic12"));
+
+        watcher.handleCommandWatchTopicUpdate(update);
+        
verify(listener).onTopicsAdded(Collections.singletonList("persistent://tenant/ns/topic12"));
+    }
+
+
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 46c22375b77..d8151579353 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -263,11 +263,12 @@ public class Commands {
         return serializeWithSize(cmd);
     }
 
-    public static ByteBuf newConnected(int clientProtocoVersion) {
-        return newConnected(clientProtocoVersion, INVALID_MAX_MESSAGE_SIZE);
+    public static ByteBuf newConnected(int clientProtocoVersion,  boolean 
supportsTopicWatchers) {
+        return newConnected(clientProtocoVersion, INVALID_MAX_MESSAGE_SIZE, 
supportsTopicWatchers);
     }
 
-    public static BaseCommand newConnectedCommand(int clientProtocolVersion, 
int maxMessageSize) {
+    public static BaseCommand newConnectedCommand(int clientProtocolVersion, 
int maxMessageSize,
+                                                  boolean 
supportsTopicWatchers) {
         BaseCommand cmd = localCmd(Type.CONNECTED);
         CommandConnected connected = cmd.setConnected()
                 .setServerVersion("Pulsar Server" + 
PulsarVersion.getVersion());
@@ -282,11 +283,13 @@ public class Commands {
         int versionToAdvertise = Math.min(currentProtocolVersion, 
clientProtocolVersion);
 
         connected.setProtocolVersion(versionToAdvertise);
+
+        
connected.setFeatureFlags().setSupportsTopicWatchers(supportsTopicWatchers);
         return cmd;
     }
 
-    public static ByteBuf newConnected(int clientProtocolVersion, int 
maxMessageSize) {
-        return serializeWithSize(newConnectedCommand(clientProtocolVersion, 
maxMessageSize));
+    public static ByteBuf newConnected(int clientProtocolVersion, int 
maxMessageSize,  boolean supportsTopicWatchers) {
+        return serializeWithSize(newConnectedCommand(clientProtocolVersion, 
maxMessageSize, supportsTopicWatchers));
     }
 
     public static ByteBuf newAuthChallenge(String authMethod, AuthData 
brokerData, int clientProtocolVersion) {
@@ -1474,6 +1477,55 @@ public class Commands {
         return serializeWithSize(cmd);
     }
 
+    public static BaseCommand newWatchTopicList(
+            long requestId, long watcherId, String namespace, String 
topicsPattern, String topicsHash) {
+        BaseCommand cmd = localCmd(Type.WATCH_TOPIC_LIST);
+        cmd.setWatchTopicList()
+                .setRequestId(requestId)
+                .setNamespace(namespace)
+                .setTopicsPattern(topicsPattern)
+                .setWatcherId(watcherId);
+        if (topicsHash != null) {
+            cmd.getWatchTopicList()
+                    .setTopicsHash(topicsHash);
+        }
+        return cmd;
+    }
+
+    public static BaseCommand newWatchTopicListSuccess(long requestId, long 
watcherId, String topicsHash,
+                                                       List<String> topics) {
+        BaseCommand cmd = localCmd(Type.WATCH_TOPIC_LIST_SUCCESS);
+        cmd.setWatchTopicListSuccess()
+                .setRequestId(requestId)
+                .setWatcherId(watcherId);
+        if (topicsHash != null) {
+                cmd.getWatchTopicListSuccess().setTopicsHash(topicsHash);
+        }
+        if (topics != null && !topics.isEmpty()) {
+                cmd.getWatchTopicListSuccess().addAllTopics(topics);
+        }
+        return cmd;
+    }
+
+    public static BaseCommand newWatchTopicUpdate(long watcherId,
+                                              List<String> newTopics, 
List<String> deletedTopics, String topicsHash) {
+        BaseCommand cmd = localCmd(Type.WATCH_TOPIC_UPDATE);
+        cmd.setWatchTopicUpdate()
+                .setWatcherId(watcherId)
+                .setTopicsHash(topicsHash)
+                .addAllNewTopics(newTopics)
+                .addAllDeletedTopics(deletedTopics);
+        return cmd;
+    }
+
+    public static BaseCommand newWatchTopicListClose(long watcherId, long 
requestId) {
+        BaseCommand cmd = localCmd(Type.WATCH_TOPIC_LIST_CLOSE);
+        cmd.setWatchTopicListClose()
+                .setRequestId(requestId)
+                .setWatcherId(watcherId);
+        return cmd;
+    }
+
     public static ByteBuf serializeWithSize(BaseCommand cmd) {
         // / Wire format
         // [TOTAL_SIZE] [CMD_SIZE][CMD]
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index 9fb22d1a289..d0b26103461 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -77,6 +77,10 @@ import org.apache.pulsar.common.api.proto.CommandSuccess;
 import org.apache.pulsar.common.api.proto.CommandTcClientConnectRequest;
 import org.apache.pulsar.common.api.proto.CommandTcClientConnectResponse;
 import org.apache.pulsar.common.api.proto.CommandUnsubscribe;
+import org.apache.pulsar.common.api.proto.CommandWatchTopicList;
+import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose;
+import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
+import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.intercept.InterceptException;
 import org.slf4j.Logger;
@@ -432,6 +436,27 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
                 checkArgument(cmd.hasEndTxnOnSubscriptionResponse());
                 
handleEndTxnOnSubscriptionResponse(cmd.getEndTxnOnSubscriptionResponse());
                 break;
+
+            case WATCH_TOPIC_LIST:
+                checkArgument(cmd.hasWatchTopicList());
+                handleCommandWatchTopicList(cmd.getWatchTopicList());
+                break;
+
+            case WATCH_TOPIC_LIST_SUCCESS:
+                checkArgument(cmd.hasWatchTopicListSuccess());
+                
handleCommandWatchTopicListSuccess(cmd.getWatchTopicListSuccess());
+                break;
+
+            case WATCH_TOPIC_UPDATE:
+                checkArgument(cmd.hasWatchTopicUpdate());
+                handleCommandWatchTopicUpdate(cmd.getWatchTopicUpdate());
+                break;
+
+            case WATCH_TOPIC_LIST_CLOSE:
+                checkArgument(cmd.hasWatchTopicListClose());
+                handleCommandWatchTopicListClose(cmd.getWatchTopicListClose());
+                break;
+
             default:
                 break;
             }
@@ -672,5 +697,22 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
         throw new UnsupportedOperationException();
     }
 
+    protected void handleCommandWatchTopicList(CommandWatchTopicList 
commandWatchTopicList) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected void handleCommandWatchTopicListSuccess(
+            CommandWatchTopicListSuccess commandWatchTopicListSuccess) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected void handleCommandWatchTopicUpdate(CommandWatchTopicUpdate 
commandWatchTopicUpdate) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected void handleCommandWatchTopicListClose(CommandWatchTopicListClose 
commandWatchTopicListClose) {
+        throw new UnsupportedOperationException();
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PulsarDecoder.class);
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
index ed373867898..574e4d5bd26 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
@@ -32,6 +32,8 @@ import org.apache.pulsar.common.naming.TopicName;
 @UtilityClass
 public class TopicList {
 
+    public static final String ALL_TOPICS_PATTERN = ".*";
+
     private static final String SCHEME_SEPARATOR = "://";
 
     private static final Pattern SCHEME_SEPARATOR_PATTERN = 
Pattern.compile(Pattern.quote(SCHEME_SEPARATOR));
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index fc8e5a5b7c6..03cc6cc54da 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -294,12 +294,14 @@ message FeatureFlags {
   optional bool supports_auth_refresh = 1 [default = false];
   optional bool supports_broker_entry_metadata = 2 [default = false];
   optional bool supports_partial_producer = 3 [default = false];
+  optional bool supports_topic_watchers = 4 [default = false];
 }
 
 message CommandConnected {
     required string server_version = 1;
     optional int32 protocol_version = 2 [default = 0];
     optional int32 max_message_size = 3;
+    optional FeatureFlags feature_flags = 4;
 }
 
 message CommandAuthResponse {
@@ -757,6 +759,34 @@ message CommandGetTopicsOfNamespaceResponse {
     optional bool   changed        = 5 [default = true];
 }
 
+message CommandWatchTopicList {
+    required uint64 request_id     = 1;
+    required uint64 watcher_id     = 2;
+    required string namespace      = 3;
+    required string topics_pattern = 4;
+    // Only present when the client reconnects:
+    optional string topics_hash    = 5;
+}
+
+message CommandWatchTopicListSuccess {
+    required uint64 request_id     = 1;
+    required uint64 watcher_id     = 2;
+    repeated string topic          = 3;
+    required string topics_hash    = 4;
+}
+
+message CommandWatchTopicUpdate {
+    required uint64 watcher_id     = 1;
+    repeated string new_topics     = 2;
+    repeated string deleted_topics = 3;
+    required string topics_hash    = 4;
+}
+
+message CommandWatchTopicListClose {
+    required uint64 request_id     = 1;
+    required uint64 watcher_id     = 2;
+}
+
 message CommandGetSchema {
     required uint64 request_id = 1;
     required string topic      = 2;
@@ -987,6 +1017,11 @@ message BaseCommand {
         TC_CLIENT_CONNECT_REQUEST = 62;
         TC_CLIENT_CONNECT_RESPONSE = 63;
 
+        WATCH_TOPIC_LIST = 64;
+        WATCH_TOPIC_LIST_SUCCESS = 65;
+        WATCH_TOPIC_UPDATE = 66;
+        WATCH_TOPIC_LIST_CLOSE = 67;
+
     }
 
 
@@ -1063,4 +1098,9 @@ message BaseCommand {
     optional CommandEndTxnOnSubscriptionResponse endTxnOnSubscriptionResponse 
= 61;
     optional CommandTcClientConnectRequest tcClientConnectRequest = 62;
     optional CommandTcClientConnectResponse tcClientConnectResponse = 63;
+
+    optional CommandWatchTopicList watchTopicList = 64;
+    optional CommandWatchTopicListSuccess watchTopicListSuccess = 65;
+    optional CommandWatchTopicUpdate watchTopicUpdate = 66;
+    optional CommandWatchTopicListClose watchTopicListClose = 67;
 }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index d3c83198773..53c8d1bd418 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -371,7 +371,7 @@ public class ProxyConnection extends PulsarHandler {
             // partitions metadata lookups
             state = State.ProxyLookupRequests;
             lookupProxyHandler = new LookupProxyHandler(service, this);
-            
ctx.writeAndFlush(Commands.newConnected(protocolVersionToAdvertise))
+            
ctx.writeAndFlush(Commands.newConnected(protocolVersionToAdvertise, false))
                     
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
         }
     }
@@ -383,7 +383,8 @@ public class ProxyConnection extends PulsarHandler {
             state = State.ProxyConnectionToBroker;
             int maxMessageSize =
                     connected.hasMaxMessageSize() ? 
connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
-            
ctx.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), 
maxMessageSize))
+            
ctx.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), 
maxMessageSize,
+                            connected.hasFeatureFlags() && 
connected.getFeatureFlags().isSupportsTopicWatchers()))
                     
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
         } else {
             LOG.warn("[{}] Channel is {}. ProxyConnection is in {}. "

Reply via email to