jiazhai closed pull request #2219: Cpp client: add 
PatternMultiTopicsConsumerImpl to support regex subscribe
URL: https://github.com/apache/incubator-pulsar/pull/2219
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/include/pulsar/Client.h 
b/pulsar-client-cpp/include/pulsar/Client.h
index 6a9e4878b7..1913851323 100644
--- a/pulsar-client-cpp/include/pulsar/Client.h
+++ b/pulsar-client-cpp/include/pulsar/Client.h
@@ -99,6 +99,9 @@ class Client {
     void subscribeAsync(const std::string& topic, const std::string& 
consumerName,
                         const ConsumerConfiguration& conf, SubscribeCallback 
callback);
 
+    /**
+     * subscribe for multiple topics under the same namespace.
+     */
     Result subscribe(const std::vector<std::string>& topics, const 
std::string& subscriptionName,
                      Consumer& consumer);
     Result subscribe(const std::vector<std::string>& topics, const 
std::string& subscriptionName,
@@ -108,6 +111,19 @@ class Client {
     void subscribeAsync(const std::vector<std::string>& topics, const 
std::string& subscriptionName,
                         const ConsumerConfiguration& conf, SubscribeCallback 
callback);
 
+    /**
+     * subscribe for multiple topics, which match given regexPattern, under 
the same namespace.
+     */
+    Result subscribeWithRegex(const std::string& regexPattern, const 
std::string& consumerName,
+                              Consumer& consumer);
+    Result subscribeWithRegex(const std::string& regexPattern, const 
std::string& consumerName,
+                              const ConsumerConfiguration& conf, Consumer& 
consumer);
+
+    void subscribeWithRegexAsync(const std::string& regexPattern, const 
std::string& consumerName,
+                                 SubscribeCallback callback);
+    void subscribeWithRegexAsync(const std::string& regexPattern, const 
std::string& consumerName,
+                                 const ConsumerConfiguration& conf, 
SubscribeCallback callback);
+
     /**
      * Create a topic reader with given {@code ReaderConfiguration} for 
reading messages from the specified
      * topic.
diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h 
b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index c9584e38f2..36e580897b 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -152,6 +152,16 @@ class ConsumerConfiguration {
     bool isReadCompacted() const;
     void setReadCompacted(bool compacted);
 
+    /**
+     * Set the time duration in minutes, for which the 
PatternMultiTopicsConsumer will do a pattern auto
+     * discovery.
+     * The default value is 60 seconds. less than 0 will disable auto 
discovery.
+     *
+     * @param periodInSeconds       period in seconds to do an auto discovery
+     */
+    void setPatternAutoDiscoveryPeriod(int periodInSeconds);
+    int getPatternAutoDiscoveryPeriod() const;
+
     friend class PulsarWrapper;
 
    private:
diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc 
b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
index c4bef3070d..296faee6cf 100644
--- a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
+++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
@@ -152,4 +152,46 @@ uint64_t BinaryProtoLookupService::newRequestId() {
     Lock lock(mutex_);
     return ++requestIdGenerator_;
 }
+
+Future<Result, NamespaceTopicsPtr> 
BinaryProtoLookupService::getTopicsOfNamespaceAsync(
+    const NamespaceNamePtr& nsName) {
+    NamespaceTopicsPromisePtr promise = boost::make_shared<Promise<Result, 
NamespaceTopicsPtr>>();
+    if (!nsName) {
+        promise->setFailed(ResultInvalidTopicName);
+        return promise->getFuture();
+    }
+    std::string namespaceName = nsName->toString();
+    Future<Result, ClientConnectionWeakPtr> future = 
cnxPool_.getConnectionAsync(serviceUrl_, serviceUrl_);
+    
future.addListener(boost::bind(&BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest,
 this,
+                                   namespaceName, _1, _2, promise));
+    return promise->getFuture();
+}
+
+void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const 
std::string& nsName, Result result,
+                                                               const 
ClientConnectionWeakPtr& clientCnx,
+                                                               
NamespaceTopicsPromisePtr promise) {
+    if (result != ResultOk) {
+        promise->setFailed(ResultConnectError);
+        return;
+    }
+
+    ClientConnectionPtr conn = clientCnx.lock();
+    uint64_t requestId = newRequestId();
+    LOG_DEBUG("sendGetTopicsOfNamespaceRequest. requestId: " << requestId << " 
nsName: " << nsName);
+
+    conn->newGetTopicsOfNamespace(nsName, requestId)
+        .addListener(
+            
boost::bind(&BinaryProtoLookupService::getTopicsOfNamespaceListener, this, _1, 
_2, promise));
+}
+
+void BinaryProtoLookupService::getTopicsOfNamespaceListener(Result result, 
NamespaceTopicsPtr topicsPtr,
+                                                            
NamespaceTopicsPromisePtr promise) {
+    if (result != ResultOk) {
+        promise->setFailed(ResultLookupError);
+        return;
+    }
+
+    promise->setValue(topicsPtr);
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.h 
b/pulsar-client-cpp/lib/BinaryProtoLookupService.h
index f647c626b4..36fb5e81d7 100644
--- a/pulsar-client-cpp/lib/BinaryProtoLookupService.h
+++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.h
@@ -40,6 +40,8 @@ class BinaryProtoLookupService : public LookupService {
 
     Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const 
TopicNamePtr& topicName);
 
+    Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const 
NamespaceNamePtr& nsName);
+
    private:
     boost::mutex mutex_;
     uint64_t requestIdGenerator_;
@@ -61,6 +63,13 @@ class BinaryProtoLookupService : public LookupService {
                                        const ClientConnectionWeakPtr& 
clientCnx,
                                        LookupDataResultPromisePtr promise);
 
+    void sendGetTopicsOfNamespaceRequest(const std::string& nsName, Result 
result,
+                                         const ClientConnectionWeakPtr& 
clientCnx,
+                                         NamespaceTopicsPromisePtr promise);
+
+    void getTopicsOfNamespaceListener(Result result, NamespaceTopicsPtr 
topicsPtr,
+                                      NamespaceTopicsPromisePtr promise);
+
     uint64_t newRequestId();
 };
 typedef boost::shared_ptr<BinaryProtoLookupService> 
BinaryProtoLookupServicePtr;
diff --git a/pulsar-client-cpp/lib/Client.cc b/pulsar-client-cpp/lib/Client.cc
index bba35203e1..5cfe01f18c 100644
--- a/pulsar-client-cpp/lib/Client.cc
+++ b/pulsar-client-cpp/lib/Client.cc
@@ -114,6 +114,30 @@ void Client::subscribeAsync(const 
std::vector<std::string>& topics, const std::s
     impl_->subscribeAsync(topics, subscriptionName, conf, callback);
 }
 
+Result Client::subscribeWithRegex(const std::string& regexPattern, const 
std::string& subscriptionName,
+                                  Consumer& consumer) {
+    return subscribeWithRegex(regexPattern, subscriptionName, 
ConsumerConfiguration(), consumer);
+}
+
+Result Client::subscribeWithRegex(const std::string& regexPattern, const 
std::string& subscriptionName,
+                                  const ConsumerConfiguration& conf, Consumer& 
consumer) {
+    Promise<Result, Consumer> promise;
+    subscribeWithRegexAsync(regexPattern, subscriptionName, conf, 
WaitForCallbackValue<Consumer>(promise));
+    Future<Result, Consumer> future = promise.getFuture();
+
+    return future.get(consumer);
+}
+
+void Client::subscribeWithRegexAsync(const std::string& regexPattern, const 
std::string& subscriptionName,
+                                     SubscribeCallback callback) {
+    subscribeWithRegexAsync(regexPattern, subscriptionName, 
ConsumerConfiguration(), callback);
+}
+
+void Client::subscribeWithRegexAsync(const std::string& regexPattern, const 
std::string& subscriptionName,
+                                     const ConsumerConfiguration& conf, 
SubscribeCallback callback) {
+    impl_->subscribeWithRegexAsync(regexPattern, subscriptionName, conf, 
callback);
+}
+
 Result Client::createReader(const std::string& topic, const MessageId& 
startMessageId,
                             const ReaderConfiguration& conf, Reader& reader) {
     Promise<Result, Reader> promise;
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc 
b/pulsar-client-cpp/lib/ClientConnection.cc
index 4e6d0f23b6..da307079e6 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -219,7 +219,7 @@ void ClientConnection::handlePulsarConnected(const 
CommandConnected& cmdConnecte
 }
 
 void ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> 
consumerStatsRequests) {
-    std::vector<Promise<Result, BrokerConsumerStatsImpl> > 
consumerStatsPromises;
+    std::vector<Promise<Result, BrokerConsumerStatsImpl>> 
consumerStatsPromises;
     Lock lock(mutex_);
 
     for (int i = 0; i < consumerStatsRequests.size(); i++) {
@@ -856,6 +856,7 @@ void ClientConnection::handleIncomingCommand() {
                                         << " -- req_id: " << 
error.request_id());
 
                     Lock lock(mutex_);
+
                     PendingRequestsMap::iterator it = 
pendingRequests_.find(error.request_id());
                     if (it != pendingRequests_.end()) {
                         PendingRequestData requestData = it->second;
@@ -865,19 +866,28 @@ void ClientConnection::handleIncomingCommand() {
                         
requestData.promise.setFailed(getResult(error.error()));
                         requestData.timer->cancel();
                     } else {
-                        PendingGetLastMessageIdRequestsMap::iterator it2 =
+                        PendingGetLastMessageIdRequestsMap::iterator it =
                             
pendingGetLastMessageIdRequests_.find(error.request_id());
-                        if (it2 != pendingGetLastMessageIdRequests_.end()) {
-                            Promise<Result, MessageId> getLastMessageIdPromise 
= it2->second;
-                            pendingGetLastMessageIdRequests_.erase(it2);
+                        if (it != pendingGetLastMessageIdRequests_.end()) {
+                            Promise<Result, MessageId> getLastMessageIdPromise 
= it->second;
+                            pendingGetLastMessageIdRequests_.erase(it);
                             lock.unlock();
 
                             
getLastMessageIdPromise.setFailed(getResult(error.error()));
                         } else {
-                            lock.unlock();
+                            PendingGetNamespaceTopicsMap::iterator it =
+                                
pendingGetNamespaceTopicsRequests_.find(error.request_id());
+                            if (it != 
pendingGetNamespaceTopicsRequests_.end()) {
+                                Promise<Result, NamespaceTopicsPtr> 
getNamespaceTopicsPromise = it->second;
+                                pendingGetNamespaceTopicsRequests_.erase(it);
+                                lock.unlock();
+
+                                
getNamespaceTopicsPromise.setFailed(getResult(error.error()));
+                            } else {
+                                lock.unlock();
+                            }
                         }
                     }
-
                     break;
                 }
 
@@ -978,6 +988,51 @@ void ClientConnection::handleIncomingCommand() {
                     break;
                 }
 
+                case BaseCommand::GET_TOPICS_OF_NAMESPACE_RESPONSE: {
+                    const CommandGetTopicsOfNamespaceResponse& response =
+                        incomingCmd_.gettopicsofnamespaceresponse();
+
+                    LOG_DEBUG(cnxString_ << "Received 
GetTopicsOfNamespaceResponse from server. req_id: "
+                                         << response.request_id() << " 
topicsSize" << response.topics_size());
+
+                    Lock lock(mutex_);
+                    PendingGetNamespaceTopicsMap::iterator it =
+                        
pendingGetNamespaceTopicsRequests_.find(response.request_id());
+
+                    if (it != pendingGetNamespaceTopicsRequests_.end()) {
+                        Promise<Result, NamespaceTopicsPtr> getTopicsPromise = 
it->second;
+                        pendingGetNamespaceTopicsRequests_.erase(it);
+                        lock.unlock();
+
+                        int numTopics = response.topics_size();
+                        std::set<std::string> topicSet;
+                        // get all topics
+                        for (int i = 0; i < numTopics; i++) {
+                            // remove partition part
+                            const std::string& topicName = response.topics(i);
+                            int pos = topicName.find("-partition-");
+                            std::string filteredName = topicName.substr(0, 
pos);
+
+                            // filter duped topic name
+                            if (topicSet.find(filteredName) == topicSet.end()) 
{
+                                topicSet.insert(filteredName);
+                            }
+                        }
+
+                        NamespaceTopicsPtr topicsPtr =
+                            
boost::make_shared<std::vector<std::string>>(topicSet.begin(), topicSet.end());
+
+                        getTopicsPromise.setValue(topicsPtr);
+                    } else {
+                        lock.unlock();
+                        LOG_WARN(
+                            "GetTopicsOfNamespaceResponse command - Received 
unknown request id from "
+                            "server: "
+                            << response.request_id());
+                    }
+                    break;
+                }
+
                 default: {
                     LOG_WARN(cnxString_ << "Received invalid message from 
server");
                     close();
@@ -1281,4 +1336,21 @@ Future<Result, MessageId> 
ClientConnection::newGetLastMessageId(uint64_t consume
     return promise.getFuture();
 }
 
+Future<Result, NamespaceTopicsPtr> 
ClientConnection::newGetTopicsOfNamespace(const std::string& nsName,
+                                                                             
uint64_t requestId) {
+    Lock lock(mutex_);
+    Promise<Result, NamespaceTopicsPtr> promise;
+    if (isClosed()) {
+        lock.unlock();
+        LOG_ERROR(cnxString_ << "Client is not connected to the broker");
+        promise.setFailed(ResultNotConnected);
+        return promise.getFuture();
+    }
+
+    pendingGetNamespaceTopicsRequests_.insert(std::make_pair(requestId, 
promise));
+    lock.unlock();
+    sendCommand(Commands::newGetTopicsOfNamespace(nsName, requestId));
+    return promise.getFuture();
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ClientConnection.h 
b/pulsar-client-cpp/lib/ClientConnection.h
index 860ca6a58b..bdbc8e536b 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -70,6 +70,8 @@ struct OpSendMsg;
 
 typedef std::pair<std::string, int64_t> ResponseData;
 
+typedef boost::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;
+
 class ClientConnection : public 
boost::enable_shared_from_this<ClientConnection> {
     enum State
     {
@@ -81,7 +83,7 @@ class ClientConnection : public 
boost::enable_shared_from_this<ClientConnection>
 
    public:
     typedef boost::shared_ptr<boost::asio::ip::tcp::socket> SocketPtr;
-    typedef 
boost::shared_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket&> > 
TlsSocketPtr;
+    typedef 
boost::shared_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket&>> 
TlsSocketPtr;
     typedef boost::shared_ptr<ClientConnection> ConnectionPtr;
     typedef boost::function<void(const boost::system::error_code&, 
ConnectionPtr)> ConnectionListener;
     typedef std::vector<ConnectionListener>::iterator ListenerIterator;
@@ -144,6 +146,8 @@ class ClientConnection : public 
boost::enable_shared_from_this<ClientConnection>
 
     Future<Result, MessageId> newGetLastMessageId(uint64_t consumerId, 
uint64_t requestId);
 
+    Future<Result, NamespaceTopicsPtr> newGetTopicsOfNamespace(const 
std::string& nsName, uint64_t requestId);
+
    private:
     struct PendingRequestData {
         Promise<Result, ResponseData> promise;
@@ -264,12 +268,15 @@ class ClientConnection : public 
boost::enable_shared_from_this<ClientConnection>
     typedef std::map<long, ConsumerImplWeakPtr> ConsumersMap;
     ConsumersMap consumers_;
 
-    typedef std::map<uint64_t, Promise<Result, BrokerConsumerStatsImpl> > 
PendingConsumerStatsMap;
+    typedef std::map<uint64_t, Promise<Result, BrokerConsumerStatsImpl>> 
PendingConsumerStatsMap;
     PendingConsumerStatsMap pendingConsumerStatsMap_;
 
-    typedef std::map<long, Promise<Result, MessageId> > 
PendingGetLastMessageIdRequestsMap;
+    typedef std::map<long, Promise<Result, MessageId>> 
PendingGetLastMessageIdRequestsMap;
     PendingGetLastMessageIdRequestsMap pendingGetLastMessageIdRequests_;
 
+    typedef std::map<long, Promise<Result, NamespaceTopicsPtr>> 
PendingGetNamespaceTopicsMap;
+    PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_;
+
     boost::mutex mutex_;
     typedef boost::unique_lock<boost::mutex> Lock;
 
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc 
b/pulsar-client-cpp/lib/ClientImpl.cc
index 376892633b..ec113fc4e6 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -25,6 +25,7 @@
 #include "PartitionedProducerImpl.h"
 #include "PartitionedConsumerImpl.h"
 #include "MultiTopicsConsumerImpl.h"
+#include "PatternMultiTopicsConsumerImpl.h"
 #include "SimpleLoggerImpl.h"
 #include "Log4CxxLogger.h"
 #include <boost/bind.hpp>
@@ -35,6 +36,7 @@
 #include <lib/HTTPLookupService.h>
 #include <lib/TopicName.h>
 #include <algorithm>
+#include <regex>
 
 DECLARE_LOG_OBJECT()
 
@@ -119,6 +121,9 @@ ExecutorServiceProviderPtr 
ClientImpl::getListenerExecutorProvider() { return li
 ExecutorServiceProviderPtr ClientImpl::getPartitionListenerExecutorProvider() {
     return partitionListenerExecutorProvider_;
 }
+
+LookupServicePtr ClientImpl::getLookup() { return lookupServicePtr_; }
+
 void ClientImpl::createProducerAsync(const std::string& topic, 
ProducerConfiguration conf,
                                      CreateProducerCallback callback) {
     TopicNamePtr topicName;
@@ -212,6 +217,59 @@ void ClientImpl::handleReaderMetadataLookup(const Result 
result, const LookupDat
     consumers_.push_back(reader->getConsumer());
 }
 
+void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, 
const std::string& consumerName,
+                                         const ConsumerConfiguration& conf, 
SubscribeCallback callback) {
+    TopicNamePtr topicNamePtr = TopicName::get(regexPattern);
+
+    Lock lock(mutex_);
+    if (state_ != Open) {
+        lock.unlock();
+        callback(ResultAlreadyClosed, Consumer());
+        return;
+    } else {
+        lock.unlock();
+        if (!topicNamePtr) {
+            LOG_ERROR("Topic pattern not valid: " << regexPattern);
+            callback(ResultInvalidTopicName, Consumer());
+            return;
+        }
+    }
+
+    NamespaceNamePtr nsName = topicNamePtr->getNamespaceName();
+
+    lookupServicePtr_->getTopicsOfNamespaceAsync(nsName).addListener(
+        boost::bind(&ClientImpl::createPatternMultiTopicsConsumer, 
shared_from_this(), _1, _2, regexPattern,
+                    consumerName, conf, callback));
+}
+
+void ClientImpl::createPatternMultiTopicsConsumer(const Result result, const 
NamespaceTopicsPtr topics,
+                                                  const std::string& 
regexPattern,
+                                                  const std::string& 
consumerName,
+                                                  const ConsumerConfiguration& 
conf,
+                                                  SubscribeCallback callback) {
+    if (result == ResultOk) {
+        ConsumerImplBasePtr consumer;
+
+        std::regex pattern(regexPattern);
+
+        NamespaceTopicsPtr matchTopics =
+            PatternMultiTopicsConsumerImpl::topicsPatternFilter(*topics, 
pattern);
+
+        consumer = boost::make_shared<PatternMultiTopicsConsumerImpl>(
+            shared_from_this(), regexPattern, *matchTopics, consumerName, 
conf, lookupServicePtr_);
+
+        consumer->getConsumerCreatedFuture().addListener(
+            boost::bind(&ClientImpl::handleConsumerCreated, 
shared_from_this(), _1, _2, callback, consumer));
+        Lock lock(mutex_);
+        consumers_.push_back(consumer);
+        lock.unlock();
+        consumer->start();
+    } else {
+        LOG_ERROR("Error Getting topicsOfNameSpace while 
createPatternMultiTopicsConsumer:  " << result);
+        callback(result, Consumer());
+    }
+}
+
 void ClientImpl::subscribeAsync(const std::vector<std::string>& topics, const 
std::string& consumerName,
                                 const ConsumerConfiguration& conf, 
SubscribeCallback callback) {
     TopicNamePtr topicNamePtr;
diff --git a/pulsar-client-cpp/lib/ClientImpl.h 
b/pulsar-client-cpp/lib/ClientImpl.h
index 550298b67f..54d459d097 100644
--- a/pulsar-client-cpp/lib/ClientImpl.h
+++ b/pulsar-client-cpp/lib/ClientImpl.h
@@ -60,6 +60,9 @@ class ClientImpl : public 
boost::enable_shared_from_this<ClientImpl> {
     void subscribeAsync(const std::vector<std::string>& topics, const 
std::string& consumerName,
                         const ConsumerConfiguration& conf, SubscribeCallback 
callback);
 
+    void subscribeWithRegexAsync(const std::string& regexPattern, const 
std::string& consumerName,
+                                 const ConsumerConfiguration& conf, 
SubscribeCallback callback);
+
     void createReaderAsync(const std::string& topic, const MessageId& 
startMessageId,
                            const ReaderConfiguration& conf, ReaderCallback 
callback);
 
@@ -82,6 +85,7 @@ class ClientImpl : public 
boost::enable_shared_from_this<ClientImpl> {
     ExecutorServiceProviderPtr getIOExecutorProvider();
     ExecutorServiceProviderPtr getListenerExecutorProvider();
     ExecutorServiceProviderPtr getPartitionListenerExecutorProvider();
+    LookupServicePtr getLookup();
     friend class PulsarFriend;
 
    private:
@@ -106,6 +110,10 @@ class ClientImpl : public 
boost::enable_shared_from_this<ClientImpl> {
 
     void handleClose(Result result, SharedInt remaining, ResultCallback 
callback);
 
+    void createPatternMultiTopicsConsumer(const Result result, const 
NamespaceTopicsPtr topics,
+                                          const std::string& regexPattern, 
const std::string& consumerName,
+                                          const ConsumerConfiguration& conf, 
SubscribeCallback callback);
+
     enum State
     {
         Open,
diff --git a/pulsar-client-cpp/lib/Commands.cc 
b/pulsar-client-cpp/lib/Commands.cc
index 13bf99a870..8a1933bde7 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -324,6 +324,18 @@ SharedBuffer Commands::newGetLastMessageId(uint64_t 
consumerId, uint64_t request
     return buffer;
 }
 
+SharedBuffer Commands::newGetTopicsOfNamespace(const std::string& nsName, 
uint64_t requestId) {
+    BaseCommand cmd;
+    cmd.set_type(BaseCommand::GET_TOPICS_OF_NAMESPACE);
+    CommandGetTopicsOfNamespace* getTopics = 
cmd.mutable_gettopicsofnamespace();
+    getTopics->set_request_id(requestId);
+    getTopics->set_namespace_(nsName);
+
+    const SharedBuffer buffer = writeMessageWithSize(cmd);
+    cmd.clear_gettopicsofnamespace();
+    return buffer;
+}
+
 std::string Commands::messageType(BaseCommand_Type type) {
     switch (type) {
         case BaseCommand::CONNECT:
@@ -416,6 +428,12 @@ std::string Commands::messageType(BaseCommand_Type type) {
         case BaseCommand::GET_LAST_MESSAGE_ID_RESPONSE:
             return "GET_LAST_MESSAGE_ID_RESPONSE";
             break;
+        case BaseCommand::GET_TOPICS_OF_NAMESPACE:
+            return "GET_TOPICS_OF_NAMESPACE";
+            break;
+        case BaseCommand::GET_TOPICS_OF_NAMESPACE_RESPONSE:
+            return "GET_TOPICS_OF_NAMESPACE_RESPONSE";
+            break;
     };
 }
 
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index 53fb1bb247..d9b8589fe2 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -112,6 +112,7 @@ class Commands {
 
     static SharedBuffer newSeek(uint64_t consumerId, uint64_t requestId, const 
MessageId& messageId);
     static SharedBuffer newGetLastMessageId(uint64_t consumerId, uint64_t 
requestId);
+    static SharedBuffer newGetTopicsOfNamespace(const std::string& nsName, 
uint64_t requestId);
 
    private:
     Commands();
diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc 
b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
index 0c145c1cf7..058ca57e1d 100644
--- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
@@ -105,4 +105,10 @@ bool ConsumerConfiguration::isReadCompacted() const { 
return impl_->readCompacte
 
 void ConsumerConfiguration::setReadCompacted(bool compacted) { 
impl_->readCompacted = compacted; }
 
+void ConsumerConfiguration::setPatternAutoDiscoveryPeriod(int periodInSeconds) 
{
+    impl_->patternAutoDiscoveryPeriod = periodInSeconds;
+}
+
+int ConsumerConfiguration::getPatternAutoDiscoveryPeriod() const { return 
impl_->patternAutoDiscoveryPeriod; }
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h 
b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
index eb0c3746bf..0cc0c72838 100644
--- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
@@ -35,6 +35,7 @@ struct ConsumerConfigurationImpl {
     CryptoKeyReaderPtr cryptoKeyReader;
     ConsumerCryptoFailureAction cryptoFailureAction;
     bool readCompacted;
+    int patternAutoDiscoveryPeriod;
     ConsumerConfigurationImpl()
         : unAckedMessagesTimeoutMs(0),
           consumerType(ConsumerExclusive),
@@ -45,7 +46,8 @@ struct ConsumerConfigurationImpl {
           maxTotalReceiverQueueSizeAcrossPartitions(50000),
           cryptoKeyReader(),
           cryptoFailureAction(ConsumerCryptoFailureAction::FAIL),
-          readCompacted(false) {}
+          readCompacted(false),
+          patternAutoDiscoveryPeriod(60) {}
 };
 }  // namespace pulsar
 #endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */
diff --git a/pulsar-client-cpp/lib/HTTPLookupService.cc 
b/pulsar-client-cpp/lib/HTTPLookupService.cc
index 36d11f5954..fe27e8d9f3 100644
--- a/pulsar-client-cpp/lib/HTTPLookupService.cc
+++ b/pulsar-client-cpp/lib/HTTPLookupService.cc
@@ -68,8 +68,9 @@ Future<Result, LookupDataResultPtr> 
HTTPLookupService::lookupAsync(const std::st
                           << topicName->getEncodedLocalName();
     }
 
-    
executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::sendHTTPRequest,
 shared_from_this(),
-                                                   promise, 
completeUrlStream.str(), Lookup));
+    
executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::handleLookupHTTPRequest,
+                                                   shared_from_this(), 
promise, completeUrlStream.str(),
+                                                   Lookup));
     return promise.getFuture();
 }
 
@@ -89,8 +90,27 @@ Future<Result, LookupDataResultPtr> 
HTTPLookupService::getPartitionMetadataAsync
                           << '/' << PARTITION_METHOD_NAME;
     }
 
-    
executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::sendHTTPRequest,
 shared_from_this(),
-                                                   promise, 
completeUrlStream.str(), PartitionMetaData));
+    
executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::handleLookupHTTPRequest,
+                                                   shared_from_this(), 
promise, completeUrlStream.str(),
+                                                   PartitionMetaData));
+    return promise.getFuture();
+}
+
+Future<Result, NamespaceTopicsPtr> 
HTTPLookupService::getTopicsOfNamespaceAsync(
+    const NamespaceNamePtr &nsName) {
+    NamespaceTopicsPromise promise;
+    std::stringstream completeUrlStream;
+
+    if (nsName->isV2()) {
+        completeUrlStream << adminUrl_ << ADMIN_PATH_V2 << "namespaces" << '/' 
<< nsName->toString() << '/'
+                          << "topics";
+    } else {
+        completeUrlStream << adminUrl_ << ADMIN_PATH_V1 << "namespaces" << '/' 
<< nsName->toString() << '/'
+                          << "destinations";
+    }
+
+    
executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::handleNamespaceTopicsHTTPRequest,
+                                                   shared_from_this(), 
promise, completeUrlStream.str()));
     return promise.getFuture();
 }
 
@@ -99,19 +119,28 @@ static size_t curlWriteCallback(void *contents, size_t 
size, size_t nmemb, void
     return size * nmemb;
 }
 
-void HTTPLookupService::sendHTTPRequest(LookupPromise promise, const 
std::string completeUrl,
-                                        RequestType requestType) {
+void 
HTTPLookupService::handleNamespaceTopicsHTTPRequest(NamespaceTopicsPromise 
promise,
+                                                         const std::string 
completeUrl) {
+    std::string responseData;
+    Result result = sendHTTPRequest(completeUrl, responseData);
+
+    if (result != ResultOk) {
+        promise.setFailed(result);
+    } else {
+        promise.setValue(parseNamespaceTopicsData(responseData));
+    }
+}
+
+Result HTTPLookupService::sendHTTPRequest(const std::string completeUrl, 
std::string &responseData) {
     CURL *handle;
     CURLcode res;
-    std::string responseData;
     std::string version = std::string("Pulsar-CPP-v") + _PULSAR_VERSION_;
     handle = curl_easy_init();
 
     if (!handle) {
         LOG_ERROR("Unable to curl_easy_init for url " << completeUrl);
-        promise.setFailed(ResultLookupError);
         // No curl_easy_cleanup required since handle not initialized
-        return;
+        return ResultLookupError;
     }
     // set URL
     curl_easy_setopt(handle, CURLOPT_URL, completeUrl.c_str());
@@ -148,9 +177,8 @@ void HTTPLookupService::sendHTTPRequest(LookupPromise 
promise, const std::string
             "All Authentication methods should have AuthenticationData and 
return true on getAuthData for "
             "url "
             << completeUrl);
-        promise.setFailed(authResult);
         curl_easy_cleanup(handle);
-        return;
+        return authResult;
     }
     struct curl_slist *list = NULL;
     if (authDataContent->hasDataForHttp()) {
@@ -158,7 +186,7 @@ void HTTPLookupService::sendHTTPRequest(LookupPromise 
promise, const std::string
     }
     curl_easy_setopt(handle, CURLOPT_HTTPHEADER, list);
 
-    LOG_INFO("Curl Lookup Request sent for" << completeUrl);
+    LOG_INFO("Curl Lookup Request sent for " << completeUrl);
 
     // Make get call to server
     res = curl_easy_perform(handle);
@@ -166,16 +194,17 @@ void HTTPLookupService::sendHTTPRequest(LookupPromise 
promise, const std::string
     // Free header list
     curl_slist_free_all(list);
 
+    Result retResult = ResultOk;
+
     switch (res) {
         case CURLE_OK:
             long response_code;
             curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &response_code);
             LOG_INFO("Response received for url " << completeUrl << " code " 
<< response_code);
             if (response_code == 200) {
-                promise.setValue((requestType == PartitionMetaData) ? 
parsePartitionData(responseData)
-                                                                    : 
parseLookupData(responseData));
+                retResult = ResultOk;
             } else {
-                promise.setFailed(ResultLookupError);
+                retResult = ResultLookupError;
             }
             break;
         case CURLE_COULDNT_CONNECT:
@@ -183,22 +212,23 @@ void HTTPLookupService::sendHTTPRequest(LookupPromise 
promise, const std::string
         case CURLE_COULDNT_RESOLVE_HOST:
         case CURLE_HTTP_RETURNED_ERROR:
             LOG_ERROR("Response failed for url " << completeUrl << ". Error 
Code " << res);
-            promise.setFailed(ResultConnectError);
+            retResult = ResultConnectError;
             break;
         case CURLE_READ_ERROR:
             LOG_ERROR("Response failed for url " << completeUrl << ". Error 
Code " << res);
-            promise.setFailed(ResultReadError);
+            retResult = ResultReadError;
             break;
         case CURLE_OPERATION_TIMEDOUT:
             LOG_ERROR("Response failed for url " << completeUrl << ". Error 
Code " << res);
-            promise.setFailed(ResultTimeout);
+            retResult = ResultTimeout;
             break;
         default:
             LOG_ERROR("Response failed for url " << completeUrl << ". Error 
Code " << res);
-            promise.setFailed(ResultLookupError);
+            retResult = ResultLookupError;
             break;
     }
     curl_easy_cleanup(handle);
+    return retResult;
 }
 
 LookupDataResultPtr HTTPLookupService::parsePartitionData(const std::string 
&json) {
@@ -243,4 +273,48 @@ LookupDataResultPtr 
HTTPLookupService::parseLookupData(const std::string &json)
     LOG_INFO("parseLookupData = " << *lookupDataResultPtr);
     return lookupDataResultPtr;
 }
+
+NamespaceTopicsPtr HTTPLookupService::parseNamespaceTopicsData(const 
std::string &json) {
+    Json::Value root;
+    Json::Reader reader;
+    if (!reader.parse(json, root, false)) {
+        LOG_ERROR("Failed to parse json of Topics of Namespace: " << 
reader.getFormatedErrorMessages()
+                                                                  << "\nInput 
Json = " << json);
+        return NamespaceTopicsPtr();
+    }
+
+    Json::Value topicsArray = root["topics"];
+    std::set<std::string> topicSet;
+    // get all topics
+    for (int i = 0; i < topicsArray.size(); i++) {
+        // remove partition part
+        const std::string &topicName = topicsArray[i].asString();
+        int pos = topicName.find("-partition-");
+        std::string filteredName = topicName.substr(0, pos);
+
+        // filter duped topic name
+        if (topicSet.find(filteredName) == topicSet.end()) {
+            topicSet.insert(filteredName);
+        }
+    }
+
+    NamespaceTopicsPtr topicsResultPtr =
+        boost::make_shared<std::vector<std::string>>(topicSet.begin(), 
topicSet.end());
+
+    return topicsResultPtr;
+}
+
+void HTTPLookupService::handleLookupHTTPRequest(LookupPromise promise, const 
std::string completeUrl,
+                                                RequestType requestType) {
+    std::string responseData;
+    Result result = sendHTTPRequest(completeUrl, responseData);
+
+    if (result != ResultOk) {
+        promise.setFailed(result);
+    } else {
+        promise.setValue((requestType == PartitionMetaData) ? 
parsePartitionData(responseData)
+                                                            : 
parseLookupData(responseData));
+    }
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/HTTPLookupService.h 
b/pulsar-client-cpp/lib/HTTPLookupService.h
index d7fff331cb..66cd2514db 100644
--- a/pulsar-client-cpp/lib/HTTPLookupService.h
+++ b/pulsar-client-cpp/lib/HTTPLookupService.h
@@ -52,7 +52,12 @@ class HTTPLookupService : public LookupService, public 
boost::enable_shared_from
 
     static LookupDataResultPtr parsePartitionData(const std::string&);
     static LookupDataResultPtr parseLookupData(const std::string&);
-    void sendHTTPRequest(LookupPromise, const std::string, RequestType);
+    static NamespaceTopicsPtr parseNamespaceTopicsData(const std::string&);
+
+    void handleLookupHTTPRequest(LookupPromise, const std::string, 
RequestType);
+    void handleNamespaceTopicsHTTPRequest(NamespaceTopicsPromise promise, 
const std::string completeUrl);
+
+    Result sendHTTPRequest(const std::string completeUrl, std::string& 
responseData);
 
    public:
     HTTPLookupService(const std::string&, const ClientConfiguration&, const 
AuthenticationPtr&);
@@ -60,6 +65,8 @@ class HTTPLookupService : public LookupService, public 
boost::enable_shared_from
     Future<Result, LookupDataResultPtr> lookupAsync(const std::string&);
 
     Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const 
TopicNamePtr&);
+
+    Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const 
NamespaceNamePtr& nsName);
 };
 }  // namespace pulsar
 
diff --git a/pulsar-client-cpp/lib/LookupService.h 
b/pulsar-client-cpp/lib/LookupService.h
index 48d55954a9..122126326a 100644
--- a/pulsar-client-cpp/lib/LookupService.h
+++ b/pulsar-client-cpp/lib/LookupService.h
@@ -26,6 +26,10 @@
 #include <lib/TopicName.h>
 
 namespace pulsar {
+typedef boost::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;
+typedef Promise<Result, NamespaceTopicsPtr> NamespaceTopicsPromise;
+typedef boost::shared_ptr<Promise<Result, NamespaceTopicsPtr>> 
NamespaceTopicsPromisePtr;
+
 class LookupService {
    public:
     /*
@@ -42,6 +46,13 @@ class LookupService {
      */
     virtual Future<Result, LookupDataResultPtr> 
getPartitionMetadataAsync(const TopicNamePtr& topicName) = 0;
 
+    /**
+     * @param   namespace - namespace-name
+     *
+     * Returns all the topics name for a given namespace.
+     */
+    virtual Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const 
NamespaceNamePtr& nsName) = 0;
+
     virtual ~LookupService() {}
 };
 
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h 
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index 6425687595..3b1d9853a9 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -80,7 +80,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     // not supported
     virtual void seekAsync(const MessageId& msgId, ResultCallback callback);
 
-   private:
+   protected:
     const ClientImplPtr client_;
     const std::string subscriptionName_;
     std::string consumerStr_;
diff --git a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc 
b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
new file mode 100644
index 0000000000..95f8a36ec2
--- /dev/null
+++ b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "PatternMultiTopicsConsumerImpl.h"
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+PatternMultiTopicsConsumerImpl::PatternMultiTopicsConsumerImpl(ClientImplPtr 
client,
+                                                               const 
std::string pattern,
+                                                               const 
std::vector<std::string>& topics,
+                                                               const 
std::string& subscriptionName,
+                                                               const 
ConsumerConfiguration& conf,
+                                                               const 
LookupServicePtr lookupServicePtr_)
+    : MultiTopicsConsumerImpl(client, topics, subscriptionName, 
TopicName::get(pattern), conf,
+                              lookupServicePtr_),
+      patternString_(pattern),
+      pattern_(std::regex(pattern)),
+      autoDiscoveryTimer_(),
+      autoDiscoveryRunning_(false) {}
+
+const std::regex PatternMultiTopicsConsumerImpl::getPattern() { return 
pattern_; }
+
+void PatternMultiTopicsConsumerImpl::resetAutoDiscoveryTimer() {
+    autoDiscoveryRunning_ = false;
+    
autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod()));
+    autoDiscoveryTimer_->async_wait(
+        boost::bind(&PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask, 
this, _1));
+}
+
+void PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const 
boost::system::error_code& err) {
+    if (err == boost::asio::error::operation_aborted) {
+        LOG_DEBUG(getName() << "Timer cancelled: " << err.message());
+        return;
+    } else if (err) {
+        LOG_ERROR(getName() << "Timer error: " << err.message());
+        return;
+    }
+
+    if (state_ != Ready) {
+        LOG_ERROR("Error in autoDiscoveryTimerTask consumer state not ready: " 
<< state_);
+        resetAutoDiscoveryTimer();
+        return;
+    }
+
+    if (autoDiscoveryRunning_) {
+        LOG_DEBUG("autoDiscoveryTimerTask still running, cancel this running. 
");
+        return;
+    }
+
+    autoDiscoveryRunning_ = true;
+
+    // already get namespace from pattern.
+    assert(namespaceName_);
+
+    lookupServicePtr_->getTopicsOfNamespaceAsync(namespaceName_)
+        
.addListener(boost::bind(&PatternMultiTopicsConsumerImpl::timerGetTopicsOfNamespace,
 this, _1, _2));
+}
+
+void PatternMultiTopicsConsumerImpl::timerGetTopicsOfNamespace(const Result 
result,
+                                                               const 
NamespaceTopicsPtr topics) {
+    if (result != ResultOk) {
+        LOG_ERROR("Error in Getting topicsOfNameSpace. result: " << result);
+        resetAutoDiscoveryTimer();
+        return;
+    }
+
+    NamespaceTopicsPtr newTopics = 
PatternMultiTopicsConsumerImpl::topicsPatternFilter(*topics, pattern_);
+    // get old topics in consumer:
+    NamespaceTopicsPtr oldTopics = 
boost::make_shared<std::vector<std::string>>();
+    for (std::map<std::string, int>::iterator it = topicsPartitions_.begin(); 
it != topicsPartitions_.end();
+         it++) {
+        oldTopics->push_back(it->first);
+    }
+    NamespaceTopicsPtr topicsAdded = topicsListsMinus(*newTopics, *oldTopics);
+    NamespaceTopicsPtr topicsRemoved = topicsListsMinus(*oldTopics, 
*newTopics);
+
+    // callback method when removed topics all un-subscribed.
+    ResultCallback topicsRemovedCallback = [this](Result result) {
+        if (result != ResultOk) {
+            LOG_ERROR("Failed to unsubscribe topics: " << result);
+        }
+        resetAutoDiscoveryTimer();
+    };
+
+    // callback method when added topics all subscribed.
+    ResultCallback topicsAddedCallback = [this, topicsRemoved, 
topicsRemovedCallback](Result result) {
+        if (result == ResultOk) {
+            // call to unsubscribe all removed topics.
+            onTopicsRemoved(topicsRemoved, topicsRemovedCallback);
+        } else {
+            resetAutoDiscoveryTimer();
+        }
+    };
+
+    // call to subscribe new added topics, then in its callback do unsubscribe
+    onTopicsAdded(topicsAdded, topicsAddedCallback);
+}
+
+void PatternMultiTopicsConsumerImpl::onTopicsAdded(NamespaceTopicsPtr 
addedTopics, ResultCallback callback) {
+    // start call subscribeOneTopicAsync for each single topic
+
+    if (addedTopics->empty()) {
+        LOG_DEBUG("no topics need subscribe");
+        callback(ResultOk);
+        return;
+    }
+    int topicsNumber = addedTopics->size();
+
+    boost::shared_ptr<std::atomic<int>> topicsNeedCreate = 
boost::make_shared<std::atomic<int>>(topicsNumber);
+    // subscribe for each passed in topic
+    for (std::vector<std::string>::const_iterator itr = addedTopics->begin(); 
itr != addedTopics->end();
+         itr++) {
+        MultiTopicsConsumerImpl::subscribeOneTopicAsync(*itr).addListener(
+            boost::bind(&PatternMultiTopicsConsumerImpl::handleOneTopicAdded, 
this, _1, *itr,
+                        topicsNeedCreate, callback));
+    }
+}
+
+void PatternMultiTopicsConsumerImpl::handleOneTopicAdded(const Result result, 
const std::string& topic,
+                                                         
boost::shared_ptr<std::atomic<int>> topicsNeedCreate,
+                                                         ResultCallback 
callback) {
+    int previous = topicsNeedCreate->fetch_sub(1);
+    assert(previous > 0);
+
+    if (result != ResultOk) {
+        LOG_ERROR("Failed when subscribed to topic " << topic << "  Error - " 
<< result);
+        callback(result);
+        return;
+    }
+
+    if (topicsNeedCreate->load() == 0) {
+        LOG_DEBUG("Subscribed all new added topics");
+        callback(result);
+    }
+}
+
+void PatternMultiTopicsConsumerImpl::onTopicsRemoved(NamespaceTopicsPtr 
removedTopics,
+                                                     ResultCallback callback) {
+    // start call subscribeOneTopicAsync for each single topic
+    if (removedTopics->empty()) {
+        LOG_DEBUG("no topics need unsubscribe");
+        callback(ResultOk);
+        return;
+    }
+    int topicsNumber = removedTopics->size();
+
+    boost::shared_ptr<std::atomic<int>> topicsNeedUnsub = 
boost::make_shared<std::atomic<int>>(topicsNumber);
+    ResultCallback oneTopicUnsubscribedCallback = [this, topicsNeedUnsub, 
callback](Result result) {
+        int previous = topicsNeedUnsub->fetch_sub(1);
+        assert(previous > 0);
+
+        if (result != ResultOk) {
+            LOG_ERROR("Failed when unsubscribe to one topic.  Error - " << 
result);
+            callback(result);
+            return;
+        }
+
+        if (topicsNeedUnsub->load() == 0) {
+            LOG_DEBUG("unSubscribed all needed topics");
+            callback(result);
+        }
+    };
+
+    // unsubscribe for each passed in topic
+    for (std::vector<std::string>::const_iterator itr = 
removedTopics->begin(); itr != removedTopics->end();
+         itr++) {
+        MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(*itr, 
oneTopicUnsubscribedCallback);
+    }
+}
+
+NamespaceTopicsPtr PatternMultiTopicsConsumerImpl::topicsPatternFilter(const 
std::vector<std::string>& topics,
+                                                                       const 
std::regex& pattern) {
+    NamespaceTopicsPtr topicsResultPtr = 
boost::make_shared<std::vector<std::string>>();
+
+    for (std::vector<std::string>::const_iterator itr = topics.begin(); itr != 
topics.end(); itr++) {
+        if (std::regex_match(*itr, pattern)) {
+            topicsResultPtr->push_back(*itr);
+        }
+    }
+    return topicsResultPtr;
+}
+
+NamespaceTopicsPtr 
PatternMultiTopicsConsumerImpl::topicsListsMinus(std::vector<std::string>& 
list1,
+                                                                    
std::vector<std::string>& list2) {
+    NamespaceTopicsPtr topicsResultPtr = 
boost::make_shared<std::vector<std::string>>();
+    std::remove_copy_if(list1.begin(), list1.end(), 
std::back_inserter(*topicsResultPtr),
+                        [&list2](const std::string& arg) {
+                            return (std::find(list2.begin(), list2.end(), arg) 
!= list2.end());
+                        });
+
+    return topicsResultPtr;
+}
+
+void PatternMultiTopicsConsumerImpl::start() {
+    MultiTopicsConsumerImpl::start();
+
+    LOG_DEBUG("PatternMultiTopicsConsumerImpl start autoDiscoveryTimer_.");
+
+    // Init autoDiscoveryTimer task only once, wait for the timeout to happen
+    if (!autoDiscoveryTimer_ && conf_.getPatternAutoDiscoveryPeriod() > 0) {
+        autoDiscoveryTimer_ = 
client_->getIOExecutorProvider()->get()->createDeadlineTimer();
+        
autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod()));
+        autoDiscoveryTimer_->async_wait(
+            
boost::bind(&PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask, this, _1));
+    }
+}
+
+void PatternMultiTopicsConsumerImpl::shutdown() {
+    Lock lock(mutex_);
+    state_ = Closed;
+    autoDiscoveryTimer_->cancel();
+    multiTopicsConsumerCreatedPromise_.setFailed(ResultAlreadyClosed);
+}
+
+void PatternMultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
+    MultiTopicsConsumerImpl::closeAsync(callback);
+    autoDiscoveryTimer_->cancel();
+}
diff --git a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.h 
b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.h
new file mode 100644
index 0000000000..503dc993f3
--- /dev/null
+++ b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.h
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef PULSAR_PATTERN_MULTI_TOPICS_CONSUMER_HEADER
+#define PULSAR_PATTERN_MULTI_TOPICS_CONSUMER_HEADER
+#include "ConsumerImpl.h"
+#include "ClientImpl.h"
+#include <regex>
+#include "boost/enable_shared_from_this.hpp"
+#include <lib/TopicName.h>
+#include <lib/NamespaceName.h>
+#include "MultiTopicsConsumerImpl.h"
+
+namespace pulsar {
+
+class PatternMultiTopicsConsumerImpl;
+
+class PatternMultiTopicsConsumerImpl : public MultiTopicsConsumerImpl {
+   public:
+    // currently we support topics under same namespace, so `patternString` is 
a regex,
+    // which only contains after namespace part.
+    // when subscribe, client will first get all topics that match given 
pattern.
+    // `topics` contains the topics that match `patternString`.
+    PatternMultiTopicsConsumerImpl(ClientImplPtr client, const std::string 
patternString,
+                                   const std::vector<std::string>& topics,
+                                   const std::string& subscriptionName, const 
ConsumerConfiguration& conf,
+                                   const LookupServicePtr lookupServicePtr_);
+
+    const std::regex getPattern();
+
+    void autoDiscoveryTimerTask(const boost::system::error_code& err);
+
+    // filter input `topics` with given `pattern`, return matched topics
+    static NamespaceTopicsPtr topicsPatternFilter(const 
std::vector<std::string>& topics,
+                                                  const std::regex& pattern);
+
+    // Find out topics, which are in `list1` but not in `list2`.
+    static NamespaceTopicsPtr topicsListsMinus(std::vector<std::string>& list1,
+                                               std::vector<std::string>& 
list2);
+
+    virtual void closeAsync(ResultCallback callback);
+    virtual void start();
+    virtual void shutdown();
+
+   private:
+    const std::string patternString_;
+    const std::regex pattern_;
+    typedef boost::shared_ptr<boost::asio::deadline_timer> TimerPtr;
+    TimerPtr autoDiscoveryTimer_;
+    bool autoDiscoveryRunning_;
+
+    void resetAutoDiscoveryTimer();
+    void timerGetTopicsOfNamespace(const Result result, const 
NamespaceTopicsPtr topics);
+    void onTopicsAdded(NamespaceTopicsPtr addedTopics, ResultCallback 
callback);
+    void onTopicsRemoved(NamespaceTopicsPtr removedTopics, ResultCallback 
callback);
+    void handleOneTopicAdded(const Result result, const std::string& topic,
+                             boost::shared_ptr<std::atomic<int>> 
topicsNeedCreate, ResultCallback callback);
+};
+
+}  // namespace pulsar
+#endif  // PULSAR_PATTERN_MULTI_TOPICS_CONSUMER_HEADER
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc 
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index cf28b34130..d4c1df80c0 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -34,6 +34,7 @@
 #include <set>
 #include <vector>
 #include <lib/MultiTopicsConsumerImpl.h>
+#include <lib/PatternMultiTopicsConsumerImpl.h>
 #include "lib/Future.h"
 #include "lib/Utils.h"
 DECLARE_LOG_OBJECT()
@@ -1679,3 +1680,259 @@ TEST(BasicEndToEndTest, testMultiTopicsConsumerPubSub) {
 
     client.shutdown();
 }
+
+TEST(BasicEndToEndTest, testPatternTopicsConsumerInvalid) {
+    Client client(lookupUrl);
+
+    // invalid namespace
+    std::string pattern = 
"invalidDomain://prop/unit/ns/patternMultiTopicsConsumerInvalid.*";
+    std::string subName = "testPatternMultiTopicsConsumerInvalid";
+
+    Consumer consumer;
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeWithRegexAsync(pattern, subName, 
WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    Result result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultInvalidTopicName, result);
+
+    client.shutdown();
+}
+
+// create 4 topics, in which 3 topics match the pattern,
+// verify PatternMultiTopicsConsumer subscribed matched topics,
+// and only receive messages from matched topics.
+TEST(BasicEndToEndTest, testPatternMultiTopicsConsumerPubSub) {
+    Client client(lookupUrl);
+    std::string pattern = 
"persistent://prop/unit/ns1/patternMultiTopicsConsumer.*";
+
+    std::string subName = "testPatternMultiTopicsConsumer";
+    std::string topicName1 = 
"persistent://prop/unit/ns1/patternMultiTopicsConsumerPubSub1";
+    std::string topicName2 = 
"persistent://prop/unit/ns1/patternMultiTopicsConsumerPubSub2";
+    std::string topicName3 = 
"persistent://prop/unit/ns1/patternMultiTopicsConsumerPubSub3";
+    // This will not match pattern
+    std::string topicName4 = 
"persistent://prop/unit/ns1/patternMultiTopicsNotMatchPubSub4";
+
+    // call admin api to make topics partitioned
+    std::string url1 =
+        adminUrl + 
"admin/persistent/prop/unit/ns1/patternMultiTopicsConsumerPubSub1/partitions";
+    std::string url2 =
+        adminUrl + 
"admin/persistent/prop/unit/ns1/patternMultiTopicsConsumerPubSub2/partitions";
+    std::string url3 =
+        adminUrl + 
"admin/persistent/prop/unit/ns1/patternMultiTopicsConsumerPubSub3/partitions";
+    std::string url4 =
+        adminUrl + 
"admin/persistent/prop/unit/ns1/patternMultiTopicsNotMatchPubSub4/partitions";
+
+    int res = makePutRequest(url1, "2");
+    ASSERT_FALSE(res != 204 && res != 409);
+    res = makePutRequest(url2, "3");
+    ASSERT_FALSE(res != 204 && res != 409);
+    res = makePutRequest(url3, "4");
+    ASSERT_FALSE(res != 204 && res != 409);
+    res = makePutRequest(url4, "4");
+    ASSERT_FALSE(res != 204 && res != 409);
+
+    Producer producer1;
+    Result result = client.createProducer(topicName1, producer1);
+    ASSERT_EQ(ResultOk, result);
+    Producer producer2;
+    result = client.createProducer(topicName2, producer2);
+    ASSERT_EQ(ResultOk, result);
+    Producer producer3;
+    result = client.createProducer(topicName3, producer3);
+    ASSERT_EQ(ResultOk, result);
+    Producer producer4;
+    result = client.createProducer(topicName4, producer4);
+    ASSERT_EQ(ResultOk, result);
+
+    LOG_INFO("created 3 producers that match, with partitions: 2, 3, 4, and 1 
producer not match");
+
+    int messageNumber = 100;
+    ConsumerConfiguration consConfig;
+    consConfig.setConsumerType(ConsumerShared);
+    consConfig.setReceiverQueueSize(10);  // size for each sub-consumer
+    Consumer consumer;
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeWithRegexAsync(pattern, subName, consConfig,
+                                   
WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+    ASSERT_EQ(consumer.getSubscriptionName(), subName);
+    LOG_INFO("created topics consumer on a pattern that match 3 topics");
+
+    std::string msgContent = "msg-content";
+    LOG_INFO("Publishing 100 messages by producer 1 synchronously");
+    for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+        std::stringstream stream;
+        stream << msgContent << msgNum;
+        Message msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer1.send(msg));
+    }
+
+    msgContent = "msg-content2";
+    LOG_INFO("Publishing 100 messages by producer 2 synchronously");
+    for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+        std::stringstream stream;
+        stream << msgContent << msgNum;
+        Message msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer2.send(msg));
+    }
+
+    msgContent = "msg-content3";
+    LOG_INFO("Publishing 100 messages by producer 3 synchronously");
+    for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+        std::stringstream stream;
+        stream << msgContent << msgNum;
+        Message msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer3.send(msg));
+    }
+
+    msgContent = "msg-content4";
+    LOG_INFO("Publishing 100 messages by producer 4 synchronously");
+    for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+        std::stringstream stream;
+        stream << msgContent << msgNum;
+        Message msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer4.send(msg));
+    }
+
+    LOG_INFO("Consuming and acking 300 messages by multiTopicsConsumer");
+    for (int i = 0; i < 3 * messageNumber; i++) {
+        Message m;
+        ASSERT_EQ(ResultOk, consumer.receive(m, 1000));
+        ASSERT_EQ(ResultOk, consumer.acknowledge(m));
+    }
+    LOG_INFO("Consumed and acked 300 messages by multiTopicsConsumer");
+
+    // verify no more to receive, because producer4 not match pattern
+    Message m;
+    ASSERT_EQ(ResultTimeout, consumer.receive(m, 1000));
+
+    ASSERT_EQ(ResultOk, consumer.unsubscribe());
+
+    client.shutdown();
+}
+
+// create a pattern consumer, which contains no match topics at beginning.
+// create 4 topics, in which 3 topics match the pattern.
+// verify PatternMultiTopicsConsumer subscribed matched topics, after a while,
+// and only receive messages from matched topics.
+TEST(BasicEndToEndTest, testPatternMultiTopicsConsumerAutoDiscovery) {
+    Client client(lookupUrl);
+    std::string pattern = 
"persistent://prop/unit/ns2/patternTopicsAutoConsumer.*";
+    Result result;
+    std::string subName = "testPatternTopicsAutoConsumer";
+
+    // 1.  create a pattern consumer, which contains no match topics at 
beginning.
+    ConsumerConfiguration consConfig;
+    consConfig.setConsumerType(ConsumerShared);
+    consConfig.setReceiverQueueSize(10);          // size for each sub-consumer
+    consConfig.setPatternAutoDiscoveryPeriod(1);  // set waiting time for auto 
discovery
+    Consumer consumer;
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeWithRegexAsync(pattern, subName, consConfig,
+                                   
WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+    ASSERT_EQ(consumer.getSubscriptionName(), subName);
+    LOG_INFO("created pattern consumer with not match topics at beginning");
+
+    // 2. create 4 topics, in which 3 match the pattern.
+    std::string topicName1 = 
"persistent://prop/unit/ns2/patternTopicsAutoConsumerPubSub1";
+    std::string topicName2 = 
"persistent://prop/unit/ns2/patternTopicsAutoConsumerPubSub2";
+    std::string topicName3 = 
"persistent://prop/unit/ns2/patternTopicsAutoConsumerPubSub3";
+    // This will not match pattern
+    std::string topicName4 = 
"persistent://prop/unit/ns2/patternMultiTopicsNotMatchPubSub4";
+
+    // call admin api to make topics partitioned
+    std::string url1 =
+        adminUrl + 
"admin/persistent/prop/unit/ns2/patternTopicsAutoConsumerPubSub1/partitions";
+    std::string url2 =
+        adminUrl + 
"admin/persistent/prop/unit/ns2/patternTopicsAutoConsumerPubSub2/partitions";
+    std::string url3 =
+        adminUrl + 
"admin/persistent/prop/unit/ns2/patternTopicsAutoConsumerPubSub3/partitions";
+    std::string url4 =
+        adminUrl + 
"admin/persistent/prop/unit/ns2/patternMultiTopicsNotMatchPubSub4/partitions";
+
+    int res = makePutRequest(url1, "2");
+    ASSERT_FALSE(res != 204 && res != 409);
+    res = makePutRequest(url2, "3");
+    ASSERT_FALSE(res != 204 && res != 409);
+    res = makePutRequest(url3, "4");
+    ASSERT_FALSE(res != 204 && res != 409);
+    res = makePutRequest(url4, "4");
+    ASSERT_FALSE(res != 204 && res != 409);
+
+    Producer producer1;
+    result = client.createProducer(topicName1, producer1);
+    ASSERT_EQ(ResultOk, result);
+    Producer producer2;
+    result = client.createProducer(topicName2, producer2);
+    ASSERT_EQ(ResultOk, result);
+    Producer producer3;
+    result = client.createProducer(topicName3, producer3);
+    ASSERT_EQ(ResultOk, result);
+    Producer producer4;
+    result = client.createProducer(topicName4, producer4);
+    ASSERT_EQ(ResultOk, result);
+    LOG_INFO("created 3 producers that match, with partitions: 2, 3, 4, and 1 
producer not match");
+
+    // 3. wait enough time to trigger auto discovery
+    usleep(2 * 1000 * 1000);
+
+    // 4. produce data.
+    int messageNumber = 100;
+    std::string msgContent = "msg-content";
+    LOG_INFO("Publishing 100 messages by producer 1 synchronously");
+    for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+        std::stringstream stream;
+        stream << msgContent << msgNum;
+        Message msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer1.send(msg));
+    }
+
+    msgContent = "msg-content2";
+    LOG_INFO("Publishing 100 messages by producer 2 synchronously");
+    for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+        std::stringstream stream;
+        stream << msgContent << msgNum;
+        Message msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer2.send(msg));
+    }
+
+    msgContent = "msg-content3";
+    LOG_INFO("Publishing 100 messages by producer 3 synchronously");
+    for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+        std::stringstream stream;
+        stream << msgContent << msgNum;
+        Message msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer3.send(msg));
+    }
+
+    msgContent = "msg-content4";
+    LOG_INFO("Publishing 100 messages by producer 4 synchronously");
+    for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+        std::stringstream stream;
+        stream << msgContent << msgNum;
+        Message msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer4.send(msg));
+    }
+
+    // 5. pattern consumer already subscribed 3 topics
+    LOG_INFO("Consuming and acking 300 messages by pattern topics consumer");
+    for (int i = 0; i < 3 * messageNumber; i++) {
+        Message m;
+        ASSERT_EQ(ResultOk, consumer.receive(m, 1000));
+        ASSERT_EQ(ResultOk, consumer.acknowledge(m));
+    }
+    LOG_INFO("Consumed and acked 300 messages by pattern topics consumer");
+
+    // verify no more to receive, because producer4 not match pattern
+    Message m;
+    ASSERT_EQ(ResultTimeout, consumer.receive(m, 1000));
+
+    ASSERT_EQ(ResultOk, consumer.unsubscribe());
+
+    client.shutdown();
+}
\ No newline at end of file
diff --git a/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc 
b/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc
index 67e6af53e9..f706eb8575 100644
--- a/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc
+++ b/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc
@@ -25,9 +25,12 @@
 #include <Future.h>
 #include <Utils.h>
 #include "ConnectionPool.h"
+#include "HttpHelper.h"
 #include <pulsar/Authentication.h>
 #include <boost/exception/all.hpp>
 
+DECLARE_LOG_OBJECT()
+
 using namespace pulsar;
 
 TEST(BinaryLookupServiceTest, basicLookup) {
@@ -56,3 +59,67 @@ TEST(BinaryLookupServiceTest, basicLookup) {
     ASSERT_TRUE(lookupData != NULL);
     ASSERT_EQ(url, lookupData->getBrokerUrl());
 }
+
+TEST(BinaryLookupServiceTest, basicGetNamespaceTopics) {
+    std::string url = "pulsar://localhost:8885";
+    std::string adminUrl = "http://localhost:8765/";;
+    Result result;
+    // 1. create some topics under same namespace
+    Client client(url);
+
+    std::string topicName1 = 
"persistent://prop/unit/ns4/basicGetNamespaceTopics1";
+    std::string topicName2 = 
"persistent://prop/unit/ns4/basicGetNamespaceTopics2";
+    std::string topicName3 = 
"persistent://prop/unit/ns4/basicGetNamespaceTopics3";
+    // This is not in same namespace.
+    std::string topicName4 = 
"persistent://prop/unit/ns2/basicGetNamespaceTopics4";
+
+    // call admin api to make topics partitioned
+    std::string url1 = adminUrl + 
"admin/persistent/prop/unit/ns4/basicGetNamespaceTopics1/partitions";
+    std::string url2 = adminUrl + 
"admin/persistent/prop/unit/ns4/basicGetNamespaceTopics2/partitions";
+    std::string url3 = adminUrl + 
"admin/persistent/prop/unit/ns4/basicGetNamespaceTopics3/partitions";
+
+    int res = makePutRequest(url1, "2");
+    ASSERT_FALSE(res != 204 && res != 409);
+    res = makePutRequest(url2, "3");
+    ASSERT_FALSE(res != 204 && res != 409);
+    res = makePutRequest(url3, "4");
+    ASSERT_FALSE(res != 204 && res != 409);
+
+    Producer producer1;
+    result = client.createProducer(topicName1, producer1);
+    ASSERT_EQ(ResultOk, result);
+    Producer producer2;
+    result = client.createProducer(topicName2, producer2);
+    ASSERT_EQ(ResultOk, result);
+    Producer producer3;
+    result = client.createProducer(topicName3, producer3);
+    ASSERT_EQ(ResultOk, result);
+    Producer producer4;
+    result = client.createProducer(topicName4, producer4);
+    ASSERT_EQ(ResultOk, result);
+
+    // 2.  call getTopicsOfNamespaceAsync
+    ExecutorServiceProviderPtr service = 
boost::make_shared<ExecutorServiceProvider>(1);
+    AuthenticationPtr authData = AuthFactory::Disabled();
+    ClientConfiguration conf;
+    ExecutorServiceProviderPtr 
ioExecutorProvider_(boost::make_shared<ExecutorServiceProvider>(1));
+    ConnectionPool pool_(conf, ioExecutorProvider_, authData, true);
+    BinaryProtoLookupService lookupService(pool_, url);
+
+    TopicNamePtr topicName = TopicName::get(topicName1);
+    NamespaceNamePtr nsName = topicName->getNamespaceName();
+
+    Future<Result, NamespaceTopicsPtr> getTopicsFuture = 
lookupService.getTopicsOfNamespaceAsync(nsName);
+    NamespaceTopicsPtr topicsData;
+    result = getTopicsFuture.get(topicsData);
+    ASSERT_EQ(ResultOk, result);
+    ASSERT_TRUE(topicsData != NULL);
+
+    // 3. verify result contains first 3 topic
+    ASSERT_EQ(topicsData->size(), 3);
+    ASSERT_TRUE(std::find(topicsData->begin(), topicsData->end(), topicName1) 
!= topicsData->end());
+    ASSERT_TRUE(std::find(topicsData->begin(), topicsData->end(), topicName2) 
!= topicsData->end());
+    ASSERT_TRUE(std::find(topicsData->begin(), topicsData->end(), topicName3) 
!= topicsData->end());
+
+    client.shutdown();
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to