jiazhai closed pull request #2219: Cpp client: add PatternMultiTopicsConsumerImpl to support regex subscribe URL: https://github.com/apache/incubator-pulsar/pull/2219
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client-cpp/include/pulsar/Client.h b/pulsar-client-cpp/include/pulsar/Client.h index 6a9e4878b7..1913851323 100644 --- a/pulsar-client-cpp/include/pulsar/Client.h +++ b/pulsar-client-cpp/include/pulsar/Client.h @@ -99,6 +99,9 @@ class Client { void subscribeAsync(const std::string& topic, const std::string& consumerName, const ConsumerConfiguration& conf, SubscribeCallback callback); + /** + * subscribe for multiple topics under the same namespace. + */ Result subscribe(const std::vector<std::string>& topics, const std::string& subscriptionName, Consumer& consumer); Result subscribe(const std::vector<std::string>& topics, const std::string& subscriptionName, @@ -108,6 +111,19 @@ class Client { void subscribeAsync(const std::vector<std::string>& topics, const std::string& subscriptionName, const ConsumerConfiguration& conf, SubscribeCallback callback); + /** + * subscribe for multiple topics, which match given regexPattern, under the same namespace. + */ + Result subscribeWithRegex(const std::string& regexPattern, const std::string& consumerName, + Consumer& consumer); + Result subscribeWithRegex(const std::string& regexPattern, const std::string& consumerName, + const ConsumerConfiguration& conf, Consumer& consumer); + + void subscribeWithRegexAsync(const std::string& regexPattern, const std::string& consumerName, + SubscribeCallback callback); + void subscribeWithRegexAsync(const std::string& regexPattern, const std::string& consumerName, + const ConsumerConfiguration& conf, SubscribeCallback callback); + /** * Create a topic reader with given {@code ReaderConfiguration} for reading messages from the specified * topic. diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h index c9584e38f2..36e580897b 100644 --- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h +++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h @@ -152,6 +152,16 @@ class ConsumerConfiguration { bool isReadCompacted() const; void setReadCompacted(bool compacted); + /** + * Set the time duration in minutes, for which the PatternMultiTopicsConsumer will do a pattern auto + * discovery. + * The default value is 60 seconds. less than 0 will disable auto discovery. + * + * @param periodInSeconds period in seconds to do an auto discovery + */ + void setPatternAutoDiscoveryPeriod(int periodInSeconds); + int getPatternAutoDiscoveryPeriod() const; + friend class PulsarWrapper; private: diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc index c4bef3070d..296faee6cf 100644 --- a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc +++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc @@ -152,4 +152,46 @@ uint64_t BinaryProtoLookupService::newRequestId() { Lock lock(mutex_); return ++requestIdGenerator_; } + +Future<Result, NamespaceTopicsPtr> BinaryProtoLookupService::getTopicsOfNamespaceAsync( + const NamespaceNamePtr& nsName) { + NamespaceTopicsPromisePtr promise = boost::make_shared<Promise<Result, NamespaceTopicsPtr>>(); + if (!nsName) { + promise->setFailed(ResultInvalidTopicName); + return promise->getFuture(); + } + std::string namespaceName = nsName->toString(); + Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(serviceUrl_, serviceUrl_); + future.addListener(boost::bind(&BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest, this, + namespaceName, _1, _2, promise)); + return promise->getFuture(); +} + +void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const std::string& nsName, Result result, + const ClientConnectionWeakPtr& clientCnx, + NamespaceTopicsPromisePtr promise) { + if (result != ResultOk) { + promise->setFailed(ResultConnectError); + return; + } + + ClientConnectionPtr conn = clientCnx.lock(); + uint64_t requestId = newRequestId(); + LOG_DEBUG("sendGetTopicsOfNamespaceRequest. requestId: " << requestId << " nsName: " << nsName); + + conn->newGetTopicsOfNamespace(nsName, requestId) + .addListener( + boost::bind(&BinaryProtoLookupService::getTopicsOfNamespaceListener, this, _1, _2, promise)); +} + +void BinaryProtoLookupService::getTopicsOfNamespaceListener(Result result, NamespaceTopicsPtr topicsPtr, + NamespaceTopicsPromisePtr promise) { + if (result != ResultOk) { + promise->setFailed(ResultLookupError); + return; + } + + promise->setValue(topicsPtr); +} + } // namespace pulsar diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.h b/pulsar-client-cpp/lib/BinaryProtoLookupService.h index f647c626b4..36fb5e81d7 100644 --- a/pulsar-client-cpp/lib/BinaryProtoLookupService.h +++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.h @@ -40,6 +40,8 @@ class BinaryProtoLookupService : public LookupService { Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName); + Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName); + private: boost::mutex mutex_; uint64_t requestIdGenerator_; @@ -61,6 +63,13 @@ class BinaryProtoLookupService : public LookupService { const ClientConnectionWeakPtr& clientCnx, LookupDataResultPromisePtr promise); + void sendGetTopicsOfNamespaceRequest(const std::string& nsName, Result result, + const ClientConnectionWeakPtr& clientCnx, + NamespaceTopicsPromisePtr promise); + + void getTopicsOfNamespaceListener(Result result, NamespaceTopicsPtr topicsPtr, + NamespaceTopicsPromisePtr promise); + uint64_t newRequestId(); }; typedef boost::shared_ptr<BinaryProtoLookupService> BinaryProtoLookupServicePtr; diff --git a/pulsar-client-cpp/lib/Client.cc b/pulsar-client-cpp/lib/Client.cc index bba35203e1..5cfe01f18c 100644 --- a/pulsar-client-cpp/lib/Client.cc +++ b/pulsar-client-cpp/lib/Client.cc @@ -114,6 +114,30 @@ void Client::subscribeAsync(const std::vector<std::string>& topics, const std::s impl_->subscribeAsync(topics, subscriptionName, conf, callback); } +Result Client::subscribeWithRegex(const std::string& regexPattern, const std::string& subscriptionName, + Consumer& consumer) { + return subscribeWithRegex(regexPattern, subscriptionName, ConsumerConfiguration(), consumer); +} + +Result Client::subscribeWithRegex(const std::string& regexPattern, const std::string& subscriptionName, + const ConsumerConfiguration& conf, Consumer& consumer) { + Promise<Result, Consumer> promise; + subscribeWithRegexAsync(regexPattern, subscriptionName, conf, WaitForCallbackValue<Consumer>(promise)); + Future<Result, Consumer> future = promise.getFuture(); + + return future.get(consumer); +} + +void Client::subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName, + SubscribeCallback callback) { + subscribeWithRegexAsync(regexPattern, subscriptionName, ConsumerConfiguration(), callback); +} + +void Client::subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName, + const ConsumerConfiguration& conf, SubscribeCallback callback) { + impl_->subscribeWithRegexAsync(regexPattern, subscriptionName, conf, callback); +} + Result Client::createReader(const std::string& topic, const MessageId& startMessageId, const ReaderConfiguration& conf, Reader& reader) { Promise<Result, Reader> promise; diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index 4e6d0f23b6..da307079e6 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -219,7 +219,7 @@ void ClientConnection::handlePulsarConnected(const CommandConnected& cmdConnecte } void ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerStatsRequests) { - std::vector<Promise<Result, BrokerConsumerStatsImpl> > consumerStatsPromises; + std::vector<Promise<Result, BrokerConsumerStatsImpl>> consumerStatsPromises; Lock lock(mutex_); for (int i = 0; i < consumerStatsRequests.size(); i++) { @@ -856,6 +856,7 @@ void ClientConnection::handleIncomingCommand() { << " -- req_id: " << error.request_id()); Lock lock(mutex_); + PendingRequestsMap::iterator it = pendingRequests_.find(error.request_id()); if (it != pendingRequests_.end()) { PendingRequestData requestData = it->second; @@ -865,19 +866,28 @@ void ClientConnection::handleIncomingCommand() { requestData.promise.setFailed(getResult(error.error())); requestData.timer->cancel(); } else { - PendingGetLastMessageIdRequestsMap::iterator it2 = + PendingGetLastMessageIdRequestsMap::iterator it = pendingGetLastMessageIdRequests_.find(error.request_id()); - if (it2 != pendingGetLastMessageIdRequests_.end()) { - Promise<Result, MessageId> getLastMessageIdPromise = it2->second; - pendingGetLastMessageIdRequests_.erase(it2); + if (it != pendingGetLastMessageIdRequests_.end()) { + Promise<Result, MessageId> getLastMessageIdPromise = it->second; + pendingGetLastMessageIdRequests_.erase(it); lock.unlock(); getLastMessageIdPromise.setFailed(getResult(error.error())); } else { - lock.unlock(); + PendingGetNamespaceTopicsMap::iterator it = + pendingGetNamespaceTopicsRequests_.find(error.request_id()); + if (it != pendingGetNamespaceTopicsRequests_.end()) { + Promise<Result, NamespaceTopicsPtr> getNamespaceTopicsPromise = it->second; + pendingGetNamespaceTopicsRequests_.erase(it); + lock.unlock(); + + getNamespaceTopicsPromise.setFailed(getResult(error.error())); + } else { + lock.unlock(); + } } } - break; } @@ -978,6 +988,51 @@ void ClientConnection::handleIncomingCommand() { break; } + case BaseCommand::GET_TOPICS_OF_NAMESPACE_RESPONSE: { + const CommandGetTopicsOfNamespaceResponse& response = + incomingCmd_.gettopicsofnamespaceresponse(); + + LOG_DEBUG(cnxString_ << "Received GetTopicsOfNamespaceResponse from server. req_id: " + << response.request_id() << " topicsSize" << response.topics_size()); + + Lock lock(mutex_); + PendingGetNamespaceTopicsMap::iterator it = + pendingGetNamespaceTopicsRequests_.find(response.request_id()); + + if (it != pendingGetNamespaceTopicsRequests_.end()) { + Promise<Result, NamespaceTopicsPtr> getTopicsPromise = it->second; + pendingGetNamespaceTopicsRequests_.erase(it); + lock.unlock(); + + int numTopics = response.topics_size(); + std::set<std::string> topicSet; + // get all topics + for (int i = 0; i < numTopics; i++) { + // remove partition part + const std::string& topicName = response.topics(i); + int pos = topicName.find("-partition-"); + std::string filteredName = topicName.substr(0, pos); + + // filter duped topic name + if (topicSet.find(filteredName) == topicSet.end()) { + topicSet.insert(filteredName); + } + } + + NamespaceTopicsPtr topicsPtr = + boost::make_shared<std::vector<std::string>>(topicSet.begin(), topicSet.end()); + + getTopicsPromise.setValue(topicsPtr); + } else { + lock.unlock(); + LOG_WARN( + "GetTopicsOfNamespaceResponse command - Received unknown request id from " + "server: " + << response.request_id()); + } + break; + } + default: { LOG_WARN(cnxString_ << "Received invalid message from server"); close(); @@ -1281,4 +1336,21 @@ Future<Result, MessageId> ClientConnection::newGetLastMessageId(uint64_t consume return promise.getFuture(); } +Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace(const std::string& nsName, + uint64_t requestId) { + Lock lock(mutex_); + Promise<Result, NamespaceTopicsPtr> promise; + if (isClosed()) { + lock.unlock(); + LOG_ERROR(cnxString_ << "Client is not connected to the broker"); + promise.setFailed(ResultNotConnected); + return promise.getFuture(); + } + + pendingGetNamespaceTopicsRequests_.insert(std::make_pair(requestId, promise)); + lock.unlock(); + sendCommand(Commands::newGetTopicsOfNamespace(nsName, requestId)); + return promise.getFuture(); +} + } // namespace pulsar diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h index 860ca6a58b..bdbc8e536b 100644 --- a/pulsar-client-cpp/lib/ClientConnection.h +++ b/pulsar-client-cpp/lib/ClientConnection.h @@ -70,6 +70,8 @@ struct OpSendMsg; typedef std::pair<std::string, int64_t> ResponseData; +typedef boost::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr; + class ClientConnection : public boost::enable_shared_from_this<ClientConnection> { enum State { @@ -81,7 +83,7 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection> public: typedef boost::shared_ptr<boost::asio::ip::tcp::socket> SocketPtr; - typedef boost::shared_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket&> > TlsSocketPtr; + typedef boost::shared_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket&>> TlsSocketPtr; typedef boost::shared_ptr<ClientConnection> ConnectionPtr; typedef boost::function<void(const boost::system::error_code&, ConnectionPtr)> ConnectionListener; typedef std::vector<ConnectionListener>::iterator ListenerIterator; @@ -144,6 +146,8 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection> Future<Result, MessageId> newGetLastMessageId(uint64_t consumerId, uint64_t requestId); + Future<Result, NamespaceTopicsPtr> newGetTopicsOfNamespace(const std::string& nsName, uint64_t requestId); + private: struct PendingRequestData { Promise<Result, ResponseData> promise; @@ -264,12 +268,15 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection> typedef std::map<long, ConsumerImplWeakPtr> ConsumersMap; ConsumersMap consumers_; - typedef std::map<uint64_t, Promise<Result, BrokerConsumerStatsImpl> > PendingConsumerStatsMap; + typedef std::map<uint64_t, Promise<Result, BrokerConsumerStatsImpl>> PendingConsumerStatsMap; PendingConsumerStatsMap pendingConsumerStatsMap_; - typedef std::map<long, Promise<Result, MessageId> > PendingGetLastMessageIdRequestsMap; + typedef std::map<long, Promise<Result, MessageId>> PendingGetLastMessageIdRequestsMap; PendingGetLastMessageIdRequestsMap pendingGetLastMessageIdRequests_; + typedef std::map<long, Promise<Result, NamespaceTopicsPtr>> PendingGetNamespaceTopicsMap; + PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_; + boost::mutex mutex_; typedef boost::unique_lock<boost::mutex> Lock; diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc index 376892633b..ec113fc4e6 100644 --- a/pulsar-client-cpp/lib/ClientImpl.cc +++ b/pulsar-client-cpp/lib/ClientImpl.cc @@ -25,6 +25,7 @@ #include "PartitionedProducerImpl.h" #include "PartitionedConsumerImpl.h" #include "MultiTopicsConsumerImpl.h" +#include "PatternMultiTopicsConsumerImpl.h" #include "SimpleLoggerImpl.h" #include "Log4CxxLogger.h" #include <boost/bind.hpp> @@ -35,6 +36,7 @@ #include <lib/HTTPLookupService.h> #include <lib/TopicName.h> #include <algorithm> +#include <regex> DECLARE_LOG_OBJECT() @@ -119,6 +121,9 @@ ExecutorServiceProviderPtr ClientImpl::getListenerExecutorProvider() { return li ExecutorServiceProviderPtr ClientImpl::getPartitionListenerExecutorProvider() { return partitionListenerExecutorProvider_; } + +LookupServicePtr ClientImpl::getLookup() { return lookupServicePtr_; } + void ClientImpl::createProducerAsync(const std::string& topic, ProducerConfiguration conf, CreateProducerCallback callback) { TopicNamePtr topicName; @@ -212,6 +217,59 @@ void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDat consumers_.push_back(reader->getConsumer()); } +void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const std::string& consumerName, + const ConsumerConfiguration& conf, SubscribeCallback callback) { + TopicNamePtr topicNamePtr = TopicName::get(regexPattern); + + Lock lock(mutex_); + if (state_ != Open) { + lock.unlock(); + callback(ResultAlreadyClosed, Consumer()); + return; + } else { + lock.unlock(); + if (!topicNamePtr) { + LOG_ERROR("Topic pattern not valid: " << regexPattern); + callback(ResultInvalidTopicName, Consumer()); + return; + } + } + + NamespaceNamePtr nsName = topicNamePtr->getNamespaceName(); + + lookupServicePtr_->getTopicsOfNamespaceAsync(nsName).addListener( + boost::bind(&ClientImpl::createPatternMultiTopicsConsumer, shared_from_this(), _1, _2, regexPattern, + consumerName, conf, callback)); +} + +void ClientImpl::createPatternMultiTopicsConsumer(const Result result, const NamespaceTopicsPtr topics, + const std::string& regexPattern, + const std::string& consumerName, + const ConsumerConfiguration& conf, + SubscribeCallback callback) { + if (result == ResultOk) { + ConsumerImplBasePtr consumer; + + std::regex pattern(regexPattern); + + NamespaceTopicsPtr matchTopics = + PatternMultiTopicsConsumerImpl::topicsPatternFilter(*topics, pattern); + + consumer = boost::make_shared<PatternMultiTopicsConsumerImpl>( + shared_from_this(), regexPattern, *matchTopics, consumerName, conf, lookupServicePtr_); + + consumer->getConsumerCreatedFuture().addListener( + boost::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), _1, _2, callback, consumer)); + Lock lock(mutex_); + consumers_.push_back(consumer); + lock.unlock(); + consumer->start(); + } else { + LOG_ERROR("Error Getting topicsOfNameSpace while createPatternMultiTopicsConsumer: " << result); + callback(result, Consumer()); + } +} + void ClientImpl::subscribeAsync(const std::vector<std::string>& topics, const std::string& consumerName, const ConsumerConfiguration& conf, SubscribeCallback callback) { TopicNamePtr topicNamePtr; diff --git a/pulsar-client-cpp/lib/ClientImpl.h b/pulsar-client-cpp/lib/ClientImpl.h index 550298b67f..54d459d097 100644 --- a/pulsar-client-cpp/lib/ClientImpl.h +++ b/pulsar-client-cpp/lib/ClientImpl.h @@ -60,6 +60,9 @@ class ClientImpl : public boost::enable_shared_from_this<ClientImpl> { void subscribeAsync(const std::vector<std::string>& topics, const std::string& consumerName, const ConsumerConfiguration& conf, SubscribeCallback callback); + void subscribeWithRegexAsync(const std::string& regexPattern, const std::string& consumerName, + const ConsumerConfiguration& conf, SubscribeCallback callback); + void createReaderAsync(const std::string& topic, const MessageId& startMessageId, const ReaderConfiguration& conf, ReaderCallback callback); @@ -82,6 +85,7 @@ class ClientImpl : public boost::enable_shared_from_this<ClientImpl> { ExecutorServiceProviderPtr getIOExecutorProvider(); ExecutorServiceProviderPtr getListenerExecutorProvider(); ExecutorServiceProviderPtr getPartitionListenerExecutorProvider(); + LookupServicePtr getLookup(); friend class PulsarFriend; private: @@ -106,6 +110,10 @@ class ClientImpl : public boost::enable_shared_from_this<ClientImpl> { void handleClose(Result result, SharedInt remaining, ResultCallback callback); + void createPatternMultiTopicsConsumer(const Result result, const NamespaceTopicsPtr topics, + const std::string& regexPattern, const std::string& consumerName, + const ConsumerConfiguration& conf, SubscribeCallback callback); + enum State { Open, diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc index 13bf99a870..8a1933bde7 100644 --- a/pulsar-client-cpp/lib/Commands.cc +++ b/pulsar-client-cpp/lib/Commands.cc @@ -324,6 +324,18 @@ SharedBuffer Commands::newGetLastMessageId(uint64_t consumerId, uint64_t request return buffer; } +SharedBuffer Commands::newGetTopicsOfNamespace(const std::string& nsName, uint64_t requestId) { + BaseCommand cmd; + cmd.set_type(BaseCommand::GET_TOPICS_OF_NAMESPACE); + CommandGetTopicsOfNamespace* getTopics = cmd.mutable_gettopicsofnamespace(); + getTopics->set_request_id(requestId); + getTopics->set_namespace_(nsName); + + const SharedBuffer buffer = writeMessageWithSize(cmd); + cmd.clear_gettopicsofnamespace(); + return buffer; +} + std::string Commands::messageType(BaseCommand_Type type) { switch (type) { case BaseCommand::CONNECT: @@ -416,6 +428,12 @@ std::string Commands::messageType(BaseCommand_Type type) { case BaseCommand::GET_LAST_MESSAGE_ID_RESPONSE: return "GET_LAST_MESSAGE_ID_RESPONSE"; break; + case BaseCommand::GET_TOPICS_OF_NAMESPACE: + return "GET_TOPICS_OF_NAMESPACE"; + break; + case BaseCommand::GET_TOPICS_OF_NAMESPACE_RESPONSE: + return "GET_TOPICS_OF_NAMESPACE_RESPONSE"; + break; }; } diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h index 53fb1bb247..d9b8589fe2 100644 --- a/pulsar-client-cpp/lib/Commands.h +++ b/pulsar-client-cpp/lib/Commands.h @@ -112,6 +112,7 @@ class Commands { static SharedBuffer newSeek(uint64_t consumerId, uint64_t requestId, const MessageId& messageId); static SharedBuffer newGetLastMessageId(uint64_t consumerId, uint64_t requestId); + static SharedBuffer newGetTopicsOfNamespace(const std::string& nsName, uint64_t requestId); private: Commands(); diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc b/pulsar-client-cpp/lib/ConsumerConfiguration.cc index 0c145c1cf7..058ca57e1d 100644 --- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc +++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc @@ -105,4 +105,10 @@ bool ConsumerConfiguration::isReadCompacted() const { return impl_->readCompacte void ConsumerConfiguration::setReadCompacted(bool compacted) { impl_->readCompacted = compacted; } +void ConsumerConfiguration::setPatternAutoDiscoveryPeriod(int periodInSeconds) { + impl_->patternAutoDiscoveryPeriod = periodInSeconds; +} + +int ConsumerConfiguration::getPatternAutoDiscoveryPeriod() const { return impl_->patternAutoDiscoveryPeriod; } + } // namespace pulsar diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h index eb0c3746bf..0cc0c72838 100644 --- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h +++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h @@ -35,6 +35,7 @@ struct ConsumerConfigurationImpl { CryptoKeyReaderPtr cryptoKeyReader; ConsumerCryptoFailureAction cryptoFailureAction; bool readCompacted; + int patternAutoDiscoveryPeriod; ConsumerConfigurationImpl() : unAckedMessagesTimeoutMs(0), consumerType(ConsumerExclusive), @@ -45,7 +46,8 @@ struct ConsumerConfigurationImpl { maxTotalReceiverQueueSizeAcrossPartitions(50000), cryptoKeyReader(), cryptoFailureAction(ConsumerCryptoFailureAction::FAIL), - readCompacted(false) {} + readCompacted(false), + patternAutoDiscoveryPeriod(60) {} }; } // namespace pulsar #endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */ diff --git a/pulsar-client-cpp/lib/HTTPLookupService.cc b/pulsar-client-cpp/lib/HTTPLookupService.cc index 36d11f5954..fe27e8d9f3 100644 --- a/pulsar-client-cpp/lib/HTTPLookupService.cc +++ b/pulsar-client-cpp/lib/HTTPLookupService.cc @@ -68,8 +68,9 @@ Future<Result, LookupDataResultPtr> HTTPLookupService::lookupAsync(const std::st << topicName->getEncodedLocalName(); } - executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::sendHTTPRequest, shared_from_this(), - promise, completeUrlStream.str(), Lookup)); + executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::handleLookupHTTPRequest, + shared_from_this(), promise, completeUrlStream.str(), + Lookup)); return promise.getFuture(); } @@ -89,8 +90,27 @@ Future<Result, LookupDataResultPtr> HTTPLookupService::getPartitionMetadataAsync << '/' << PARTITION_METHOD_NAME; } - executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::sendHTTPRequest, shared_from_this(), - promise, completeUrlStream.str(), PartitionMetaData)); + executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::handleLookupHTTPRequest, + shared_from_this(), promise, completeUrlStream.str(), + PartitionMetaData)); + return promise.getFuture(); +} + +Future<Result, NamespaceTopicsPtr> HTTPLookupService::getTopicsOfNamespaceAsync( + const NamespaceNamePtr &nsName) { + NamespaceTopicsPromise promise; + std::stringstream completeUrlStream; + + if (nsName->isV2()) { + completeUrlStream << adminUrl_ << ADMIN_PATH_V2 << "namespaces" << '/' << nsName->toString() << '/' + << "topics"; + } else { + completeUrlStream << adminUrl_ << ADMIN_PATH_V1 << "namespaces" << '/' << nsName->toString() << '/' + << "destinations"; + } + + executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::handleNamespaceTopicsHTTPRequest, + shared_from_this(), promise, completeUrlStream.str())); return promise.getFuture(); } @@ -99,19 +119,28 @@ static size_t curlWriteCallback(void *contents, size_t size, size_t nmemb, void return size * nmemb; } -void HTTPLookupService::sendHTTPRequest(LookupPromise promise, const std::string completeUrl, - RequestType requestType) { +void HTTPLookupService::handleNamespaceTopicsHTTPRequest(NamespaceTopicsPromise promise, + const std::string completeUrl) { + std::string responseData; + Result result = sendHTTPRequest(completeUrl, responseData); + + if (result != ResultOk) { + promise.setFailed(result); + } else { + promise.setValue(parseNamespaceTopicsData(responseData)); + } +} + +Result HTTPLookupService::sendHTTPRequest(const std::string completeUrl, std::string &responseData) { CURL *handle; CURLcode res; - std::string responseData; std::string version = std::string("Pulsar-CPP-v") + _PULSAR_VERSION_; handle = curl_easy_init(); if (!handle) { LOG_ERROR("Unable to curl_easy_init for url " << completeUrl); - promise.setFailed(ResultLookupError); // No curl_easy_cleanup required since handle not initialized - return; + return ResultLookupError; } // set URL curl_easy_setopt(handle, CURLOPT_URL, completeUrl.c_str()); @@ -148,9 +177,8 @@ void HTTPLookupService::sendHTTPRequest(LookupPromise promise, const std::string "All Authentication methods should have AuthenticationData and return true on getAuthData for " "url " << completeUrl); - promise.setFailed(authResult); curl_easy_cleanup(handle); - return; + return authResult; } struct curl_slist *list = NULL; if (authDataContent->hasDataForHttp()) { @@ -158,7 +186,7 @@ void HTTPLookupService::sendHTTPRequest(LookupPromise promise, const std::string } curl_easy_setopt(handle, CURLOPT_HTTPHEADER, list); - LOG_INFO("Curl Lookup Request sent for" << completeUrl); + LOG_INFO("Curl Lookup Request sent for " << completeUrl); // Make get call to server res = curl_easy_perform(handle); @@ -166,16 +194,17 @@ void HTTPLookupService::sendHTTPRequest(LookupPromise promise, const std::string // Free header list curl_slist_free_all(list); + Result retResult = ResultOk; + switch (res) { case CURLE_OK: long response_code; curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &response_code); LOG_INFO("Response received for url " << completeUrl << " code " << response_code); if (response_code == 200) { - promise.setValue((requestType == PartitionMetaData) ? parsePartitionData(responseData) - : parseLookupData(responseData)); + retResult = ResultOk; } else { - promise.setFailed(ResultLookupError); + retResult = ResultLookupError; } break; case CURLE_COULDNT_CONNECT: @@ -183,22 +212,23 @@ void HTTPLookupService::sendHTTPRequest(LookupPromise promise, const std::string case CURLE_COULDNT_RESOLVE_HOST: case CURLE_HTTP_RETURNED_ERROR: LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res); - promise.setFailed(ResultConnectError); + retResult = ResultConnectError; break; case CURLE_READ_ERROR: LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res); - promise.setFailed(ResultReadError); + retResult = ResultReadError; break; case CURLE_OPERATION_TIMEDOUT: LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res); - promise.setFailed(ResultTimeout); + retResult = ResultTimeout; break; default: LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res); - promise.setFailed(ResultLookupError); + retResult = ResultLookupError; break; } curl_easy_cleanup(handle); + return retResult; } LookupDataResultPtr HTTPLookupService::parsePartitionData(const std::string &json) { @@ -243,4 +273,48 @@ LookupDataResultPtr HTTPLookupService::parseLookupData(const std::string &json) LOG_INFO("parseLookupData = " << *lookupDataResultPtr); return lookupDataResultPtr; } + +NamespaceTopicsPtr HTTPLookupService::parseNamespaceTopicsData(const std::string &json) { + Json::Value root; + Json::Reader reader; + if (!reader.parse(json, root, false)) { + LOG_ERROR("Failed to parse json of Topics of Namespace: " << reader.getFormatedErrorMessages() + << "\nInput Json = " << json); + return NamespaceTopicsPtr(); + } + + Json::Value topicsArray = root["topics"]; + std::set<std::string> topicSet; + // get all topics + for (int i = 0; i < topicsArray.size(); i++) { + // remove partition part + const std::string &topicName = topicsArray[i].asString(); + int pos = topicName.find("-partition-"); + std::string filteredName = topicName.substr(0, pos); + + // filter duped topic name + if (topicSet.find(filteredName) == topicSet.end()) { + topicSet.insert(filteredName); + } + } + + NamespaceTopicsPtr topicsResultPtr = + boost::make_shared<std::vector<std::string>>(topicSet.begin(), topicSet.end()); + + return topicsResultPtr; +} + +void HTTPLookupService::handleLookupHTTPRequest(LookupPromise promise, const std::string completeUrl, + RequestType requestType) { + std::string responseData; + Result result = sendHTTPRequest(completeUrl, responseData); + + if (result != ResultOk) { + promise.setFailed(result); + } else { + promise.setValue((requestType == PartitionMetaData) ? parsePartitionData(responseData) + : parseLookupData(responseData)); + } +} + } // namespace pulsar diff --git a/pulsar-client-cpp/lib/HTTPLookupService.h b/pulsar-client-cpp/lib/HTTPLookupService.h index d7fff331cb..66cd2514db 100644 --- a/pulsar-client-cpp/lib/HTTPLookupService.h +++ b/pulsar-client-cpp/lib/HTTPLookupService.h @@ -52,7 +52,12 @@ class HTTPLookupService : public LookupService, public boost::enable_shared_from static LookupDataResultPtr parsePartitionData(const std::string&); static LookupDataResultPtr parseLookupData(const std::string&); - void sendHTTPRequest(LookupPromise, const std::string, RequestType); + static NamespaceTopicsPtr parseNamespaceTopicsData(const std::string&); + + void handleLookupHTTPRequest(LookupPromise, const std::string, RequestType); + void handleNamespaceTopicsHTTPRequest(NamespaceTopicsPromise promise, const std::string completeUrl); + + Result sendHTTPRequest(const std::string completeUrl, std::string& responseData); public: HTTPLookupService(const std::string&, const ClientConfiguration&, const AuthenticationPtr&); @@ -60,6 +65,8 @@ class HTTPLookupService : public LookupService, public boost::enable_shared_from Future<Result, LookupDataResultPtr> lookupAsync(const std::string&); Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr&); + + Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName); }; } // namespace pulsar diff --git a/pulsar-client-cpp/lib/LookupService.h b/pulsar-client-cpp/lib/LookupService.h index 48d55954a9..122126326a 100644 --- a/pulsar-client-cpp/lib/LookupService.h +++ b/pulsar-client-cpp/lib/LookupService.h @@ -26,6 +26,10 @@ #include <lib/TopicName.h> namespace pulsar { +typedef boost::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr; +typedef Promise<Result, NamespaceTopicsPtr> NamespaceTopicsPromise; +typedef boost::shared_ptr<Promise<Result, NamespaceTopicsPtr>> NamespaceTopicsPromisePtr; + class LookupService { public: /* @@ -42,6 +46,13 @@ class LookupService { */ virtual Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName) = 0; + /** + * @param namespace - namespace-name + * + * Returns all the topics name for a given namespace. + */ + virtual Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName) = 0; + virtual ~LookupService() {} }; diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h index 6425687595..3b1d9853a9 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h @@ -80,7 +80,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase, // not supported virtual void seekAsync(const MessageId& msgId, ResultCallback callback); - private: + protected: const ClientImplPtr client_; const std::string subscriptionName_; std::string consumerStr_; diff --git a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc new file mode 100644 index 0000000000..95f8a36ec2 --- /dev/null +++ b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "PatternMultiTopicsConsumerImpl.h" + +DECLARE_LOG_OBJECT() + +using namespace pulsar; + +PatternMultiTopicsConsumerImpl::PatternMultiTopicsConsumerImpl(ClientImplPtr client, + const std::string pattern, + const std::vector<std::string>& topics, + const std::string& subscriptionName, + const ConsumerConfiguration& conf, + const LookupServicePtr lookupServicePtr_) + : MultiTopicsConsumerImpl(client, topics, subscriptionName, TopicName::get(pattern), conf, + lookupServicePtr_), + patternString_(pattern), + pattern_(std::regex(pattern)), + autoDiscoveryTimer_(), + autoDiscoveryRunning_(false) {} + +const std::regex PatternMultiTopicsConsumerImpl::getPattern() { return pattern_; } + +void PatternMultiTopicsConsumerImpl::resetAutoDiscoveryTimer() { + autoDiscoveryRunning_ = false; + autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod())); + autoDiscoveryTimer_->async_wait( + boost::bind(&PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask, this, _1)); +} + +void PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const boost::system::error_code& err) { + if (err == boost::asio::error::operation_aborted) { + LOG_DEBUG(getName() << "Timer cancelled: " << err.message()); + return; + } else if (err) { + LOG_ERROR(getName() << "Timer error: " << err.message()); + return; + } + + if (state_ != Ready) { + LOG_ERROR("Error in autoDiscoveryTimerTask consumer state not ready: " << state_); + resetAutoDiscoveryTimer(); + return; + } + + if (autoDiscoveryRunning_) { + LOG_DEBUG("autoDiscoveryTimerTask still running, cancel this running. "); + return; + } + + autoDiscoveryRunning_ = true; + + // already get namespace from pattern. + assert(namespaceName_); + + lookupServicePtr_->getTopicsOfNamespaceAsync(namespaceName_) + .addListener(boost::bind(&PatternMultiTopicsConsumerImpl::timerGetTopicsOfNamespace, this, _1, _2)); +} + +void PatternMultiTopicsConsumerImpl::timerGetTopicsOfNamespace(const Result result, + const NamespaceTopicsPtr topics) { + if (result != ResultOk) { + LOG_ERROR("Error in Getting topicsOfNameSpace. result: " << result); + resetAutoDiscoveryTimer(); + return; + } + + NamespaceTopicsPtr newTopics = PatternMultiTopicsConsumerImpl::topicsPatternFilter(*topics, pattern_); + // get old topics in consumer: + NamespaceTopicsPtr oldTopics = boost::make_shared<std::vector<std::string>>(); + for (std::map<std::string, int>::iterator it = topicsPartitions_.begin(); it != topicsPartitions_.end(); + it++) { + oldTopics->push_back(it->first); + } + NamespaceTopicsPtr topicsAdded = topicsListsMinus(*newTopics, *oldTopics); + NamespaceTopicsPtr topicsRemoved = topicsListsMinus(*oldTopics, *newTopics); + + // callback method when removed topics all un-subscribed. + ResultCallback topicsRemovedCallback = [this](Result result) { + if (result != ResultOk) { + LOG_ERROR("Failed to unsubscribe topics: " << result); + } + resetAutoDiscoveryTimer(); + }; + + // callback method when added topics all subscribed. + ResultCallback topicsAddedCallback = [this, topicsRemoved, topicsRemovedCallback](Result result) { + if (result == ResultOk) { + // call to unsubscribe all removed topics. + onTopicsRemoved(topicsRemoved, topicsRemovedCallback); + } else { + resetAutoDiscoveryTimer(); + } + }; + + // call to subscribe new added topics, then in its callback do unsubscribe + onTopicsAdded(topicsAdded, topicsAddedCallback); +} + +void PatternMultiTopicsConsumerImpl::onTopicsAdded(NamespaceTopicsPtr addedTopics, ResultCallback callback) { + // start call subscribeOneTopicAsync for each single topic + + if (addedTopics->empty()) { + LOG_DEBUG("no topics need subscribe"); + callback(ResultOk); + return; + } + int topicsNumber = addedTopics->size(); + + boost::shared_ptr<std::atomic<int>> topicsNeedCreate = boost::make_shared<std::atomic<int>>(topicsNumber); + // subscribe for each passed in topic + for (std::vector<std::string>::const_iterator itr = addedTopics->begin(); itr != addedTopics->end(); + itr++) { + MultiTopicsConsumerImpl::subscribeOneTopicAsync(*itr).addListener( + boost::bind(&PatternMultiTopicsConsumerImpl::handleOneTopicAdded, this, _1, *itr, + topicsNeedCreate, callback)); + } +} + +void PatternMultiTopicsConsumerImpl::handleOneTopicAdded(const Result result, const std::string& topic, + boost::shared_ptr<std::atomic<int>> topicsNeedCreate, + ResultCallback callback) { + int previous = topicsNeedCreate->fetch_sub(1); + assert(previous > 0); + + if (result != ResultOk) { + LOG_ERROR("Failed when subscribed to topic " << topic << " Error - " << result); + callback(result); + return; + } + + if (topicsNeedCreate->load() == 0) { + LOG_DEBUG("Subscribed all new added topics"); + callback(result); + } +} + +void PatternMultiTopicsConsumerImpl::onTopicsRemoved(NamespaceTopicsPtr removedTopics, + ResultCallback callback) { + // start call subscribeOneTopicAsync for each single topic + if (removedTopics->empty()) { + LOG_DEBUG("no topics need unsubscribe"); + callback(ResultOk); + return; + } + int topicsNumber = removedTopics->size(); + + boost::shared_ptr<std::atomic<int>> topicsNeedUnsub = boost::make_shared<std::atomic<int>>(topicsNumber); + ResultCallback oneTopicUnsubscribedCallback = [this, topicsNeedUnsub, callback](Result result) { + int previous = topicsNeedUnsub->fetch_sub(1); + assert(previous > 0); + + if (result != ResultOk) { + LOG_ERROR("Failed when unsubscribe to one topic. Error - " << result); + callback(result); + return; + } + + if (topicsNeedUnsub->load() == 0) { + LOG_DEBUG("unSubscribed all needed topics"); + callback(result); + } + }; + + // unsubscribe for each passed in topic + for (std::vector<std::string>::const_iterator itr = removedTopics->begin(); itr != removedTopics->end(); + itr++) { + MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(*itr, oneTopicUnsubscribedCallback); + } +} + +NamespaceTopicsPtr PatternMultiTopicsConsumerImpl::topicsPatternFilter(const std::vector<std::string>& topics, + const std::regex& pattern) { + NamespaceTopicsPtr topicsResultPtr = boost::make_shared<std::vector<std::string>>(); + + for (std::vector<std::string>::const_iterator itr = topics.begin(); itr != topics.end(); itr++) { + if (std::regex_match(*itr, pattern)) { + topicsResultPtr->push_back(*itr); + } + } + return topicsResultPtr; +} + +NamespaceTopicsPtr PatternMultiTopicsConsumerImpl::topicsListsMinus(std::vector<std::string>& list1, + std::vector<std::string>& list2) { + NamespaceTopicsPtr topicsResultPtr = boost::make_shared<std::vector<std::string>>(); + std::remove_copy_if(list1.begin(), list1.end(), std::back_inserter(*topicsResultPtr), + [&list2](const std::string& arg) { + return (std::find(list2.begin(), list2.end(), arg) != list2.end()); + }); + + return topicsResultPtr; +} + +void PatternMultiTopicsConsumerImpl::start() { + MultiTopicsConsumerImpl::start(); + + LOG_DEBUG("PatternMultiTopicsConsumerImpl start autoDiscoveryTimer_."); + + // Init autoDiscoveryTimer task only once, wait for the timeout to happen + if (!autoDiscoveryTimer_ && conf_.getPatternAutoDiscoveryPeriod() > 0) { + autoDiscoveryTimer_ = client_->getIOExecutorProvider()->get()->createDeadlineTimer(); + autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod())); + autoDiscoveryTimer_->async_wait( + boost::bind(&PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask, this, _1)); + } +} + +void PatternMultiTopicsConsumerImpl::shutdown() { + Lock lock(mutex_); + state_ = Closed; + autoDiscoveryTimer_->cancel(); + multiTopicsConsumerCreatedPromise_.setFailed(ResultAlreadyClosed); +} + +void PatternMultiTopicsConsumerImpl::closeAsync(ResultCallback callback) { + MultiTopicsConsumerImpl::closeAsync(callback); + autoDiscoveryTimer_->cancel(); +} diff --git a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.h new file mode 100644 index 0000000000..503dc993f3 --- /dev/null +++ b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.h @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef PULSAR_PATTERN_MULTI_TOPICS_CONSUMER_HEADER +#define PULSAR_PATTERN_MULTI_TOPICS_CONSUMER_HEADER +#include "ConsumerImpl.h" +#include "ClientImpl.h" +#include <regex> +#include "boost/enable_shared_from_this.hpp" +#include <lib/TopicName.h> +#include <lib/NamespaceName.h> +#include "MultiTopicsConsumerImpl.h" + +namespace pulsar { + +class PatternMultiTopicsConsumerImpl; + +class PatternMultiTopicsConsumerImpl : public MultiTopicsConsumerImpl { + public: + // currently we support topics under same namespace, so `patternString` is a regex, + // which only contains after namespace part. + // when subscribe, client will first get all topics that match given pattern. + // `topics` contains the topics that match `patternString`. + PatternMultiTopicsConsumerImpl(ClientImplPtr client, const std::string patternString, + const std::vector<std::string>& topics, + const std::string& subscriptionName, const ConsumerConfiguration& conf, + const LookupServicePtr lookupServicePtr_); + + const std::regex getPattern(); + + void autoDiscoveryTimerTask(const boost::system::error_code& err); + + // filter input `topics` with given `pattern`, return matched topics + static NamespaceTopicsPtr topicsPatternFilter(const std::vector<std::string>& topics, + const std::regex& pattern); + + // Find out topics, which are in `list1` but not in `list2`. + static NamespaceTopicsPtr topicsListsMinus(std::vector<std::string>& list1, + std::vector<std::string>& list2); + + virtual void closeAsync(ResultCallback callback); + virtual void start(); + virtual void shutdown(); + + private: + const std::string patternString_; + const std::regex pattern_; + typedef boost::shared_ptr<boost::asio::deadline_timer> TimerPtr; + TimerPtr autoDiscoveryTimer_; + bool autoDiscoveryRunning_; + + void resetAutoDiscoveryTimer(); + void timerGetTopicsOfNamespace(const Result result, const NamespaceTopicsPtr topics); + void onTopicsAdded(NamespaceTopicsPtr addedTopics, ResultCallback callback); + void onTopicsRemoved(NamespaceTopicsPtr removedTopics, ResultCallback callback); + void handleOneTopicAdded(const Result result, const std::string& topic, + boost::shared_ptr<std::atomic<int>> topicsNeedCreate, ResultCallback callback); +}; + +} // namespace pulsar +#endif // PULSAR_PATTERN_MULTI_TOPICS_CONSUMER_HEADER diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index cf28b34130..d4c1df80c0 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -34,6 +34,7 @@ #include <set> #include <vector> #include <lib/MultiTopicsConsumerImpl.h> +#include <lib/PatternMultiTopicsConsumerImpl.h> #include "lib/Future.h" #include "lib/Utils.h" DECLARE_LOG_OBJECT() @@ -1679,3 +1680,259 @@ TEST(BasicEndToEndTest, testMultiTopicsConsumerPubSub) { client.shutdown(); } + +TEST(BasicEndToEndTest, testPatternTopicsConsumerInvalid) { + Client client(lookupUrl); + + // invalid namespace + std::string pattern = "invalidDomain://prop/unit/ns/patternMultiTopicsConsumerInvalid.*"; + std::string subName = "testPatternMultiTopicsConsumerInvalid"; + + Consumer consumer; + Promise<Result, Consumer> consumerPromise; + client.subscribeWithRegexAsync(pattern, subName, WaitForCallbackValue<Consumer>(consumerPromise)); + Future<Result, Consumer> consumerFuture = consumerPromise.getFuture(); + Result result = consumerFuture.get(consumer); + ASSERT_EQ(ResultInvalidTopicName, result); + + client.shutdown(); +} + +// create 4 topics, in which 3 topics match the pattern, +// verify PatternMultiTopicsConsumer subscribed matched topics, +// and only receive messages from matched topics. +TEST(BasicEndToEndTest, testPatternMultiTopicsConsumerPubSub) { + Client client(lookupUrl); + std::string pattern = "persistent://prop/unit/ns1/patternMultiTopicsConsumer.*"; + + std::string subName = "testPatternMultiTopicsConsumer"; + std::string topicName1 = "persistent://prop/unit/ns1/patternMultiTopicsConsumerPubSub1"; + std::string topicName2 = "persistent://prop/unit/ns1/patternMultiTopicsConsumerPubSub2"; + std::string topicName3 = "persistent://prop/unit/ns1/patternMultiTopicsConsumerPubSub3"; + // This will not match pattern + std::string topicName4 = "persistent://prop/unit/ns1/patternMultiTopicsNotMatchPubSub4"; + + // call admin api to make topics partitioned + std::string url1 = + adminUrl + "admin/persistent/prop/unit/ns1/patternMultiTopicsConsumerPubSub1/partitions"; + std::string url2 = + adminUrl + "admin/persistent/prop/unit/ns1/patternMultiTopicsConsumerPubSub2/partitions"; + std::string url3 = + adminUrl + "admin/persistent/prop/unit/ns1/patternMultiTopicsConsumerPubSub3/partitions"; + std::string url4 = + adminUrl + "admin/persistent/prop/unit/ns1/patternMultiTopicsNotMatchPubSub4/partitions"; + + int res = makePutRequest(url1, "2"); + ASSERT_FALSE(res != 204 && res != 409); + res = makePutRequest(url2, "3"); + ASSERT_FALSE(res != 204 && res != 409); + res = makePutRequest(url3, "4"); + ASSERT_FALSE(res != 204 && res != 409); + res = makePutRequest(url4, "4"); + ASSERT_FALSE(res != 204 && res != 409); + + Producer producer1; + Result result = client.createProducer(topicName1, producer1); + ASSERT_EQ(ResultOk, result); + Producer producer2; + result = client.createProducer(topicName2, producer2); + ASSERT_EQ(ResultOk, result); + Producer producer3; + result = client.createProducer(topicName3, producer3); + ASSERT_EQ(ResultOk, result); + Producer producer4; + result = client.createProducer(topicName4, producer4); + ASSERT_EQ(ResultOk, result); + + LOG_INFO("created 3 producers that match, with partitions: 2, 3, 4, and 1 producer not match"); + + int messageNumber = 100; + ConsumerConfiguration consConfig; + consConfig.setConsumerType(ConsumerShared); + consConfig.setReceiverQueueSize(10); // size for each sub-consumer + Consumer consumer; + Promise<Result, Consumer> consumerPromise; + client.subscribeWithRegexAsync(pattern, subName, consConfig, + WaitForCallbackValue<Consumer>(consumerPromise)); + Future<Result, Consumer> consumerFuture = consumerPromise.getFuture(); + result = consumerFuture.get(consumer); + ASSERT_EQ(ResultOk, result); + ASSERT_EQ(consumer.getSubscriptionName(), subName); + LOG_INFO("created topics consumer on a pattern that match 3 topics"); + + std::string msgContent = "msg-content"; + LOG_INFO("Publishing 100 messages by producer 1 synchronously"); + for (int msgNum = 0; msgNum < messageNumber; msgNum++) { + std::stringstream stream; + stream << msgContent << msgNum; + Message msg = MessageBuilder().setContent(stream.str()).build(); + ASSERT_EQ(ResultOk, producer1.send(msg)); + } + + msgContent = "msg-content2"; + LOG_INFO("Publishing 100 messages by producer 2 synchronously"); + for (int msgNum = 0; msgNum < messageNumber; msgNum++) { + std::stringstream stream; + stream << msgContent << msgNum; + Message msg = MessageBuilder().setContent(stream.str()).build(); + ASSERT_EQ(ResultOk, producer2.send(msg)); + } + + msgContent = "msg-content3"; + LOG_INFO("Publishing 100 messages by producer 3 synchronously"); + for (int msgNum = 0; msgNum < messageNumber; msgNum++) { + std::stringstream stream; + stream << msgContent << msgNum; + Message msg = MessageBuilder().setContent(stream.str()).build(); + ASSERT_EQ(ResultOk, producer3.send(msg)); + } + + msgContent = "msg-content4"; + LOG_INFO("Publishing 100 messages by producer 4 synchronously"); + for (int msgNum = 0; msgNum < messageNumber; msgNum++) { + std::stringstream stream; + stream << msgContent << msgNum; + Message msg = MessageBuilder().setContent(stream.str()).build(); + ASSERT_EQ(ResultOk, producer4.send(msg)); + } + + LOG_INFO("Consuming and acking 300 messages by multiTopicsConsumer"); + for (int i = 0; i < 3 * messageNumber; i++) { + Message m; + ASSERT_EQ(ResultOk, consumer.receive(m, 1000)); + ASSERT_EQ(ResultOk, consumer.acknowledge(m)); + } + LOG_INFO("Consumed and acked 300 messages by multiTopicsConsumer"); + + // verify no more to receive, because producer4 not match pattern + Message m; + ASSERT_EQ(ResultTimeout, consumer.receive(m, 1000)); + + ASSERT_EQ(ResultOk, consumer.unsubscribe()); + + client.shutdown(); +} + +// create a pattern consumer, which contains no match topics at beginning. +// create 4 topics, in which 3 topics match the pattern. +// verify PatternMultiTopicsConsumer subscribed matched topics, after a while, +// and only receive messages from matched topics. +TEST(BasicEndToEndTest, testPatternMultiTopicsConsumerAutoDiscovery) { + Client client(lookupUrl); + std::string pattern = "persistent://prop/unit/ns2/patternTopicsAutoConsumer.*"; + Result result; + std::string subName = "testPatternTopicsAutoConsumer"; + + // 1. create a pattern consumer, which contains no match topics at beginning. + ConsumerConfiguration consConfig; + consConfig.setConsumerType(ConsumerShared); + consConfig.setReceiverQueueSize(10); // size for each sub-consumer + consConfig.setPatternAutoDiscoveryPeriod(1); // set waiting time for auto discovery + Consumer consumer; + Promise<Result, Consumer> consumerPromise; + client.subscribeWithRegexAsync(pattern, subName, consConfig, + WaitForCallbackValue<Consumer>(consumerPromise)); + Future<Result, Consumer> consumerFuture = consumerPromise.getFuture(); + result = consumerFuture.get(consumer); + ASSERT_EQ(ResultOk, result); + ASSERT_EQ(consumer.getSubscriptionName(), subName); + LOG_INFO("created pattern consumer with not match topics at beginning"); + + // 2. create 4 topics, in which 3 match the pattern. + std::string topicName1 = "persistent://prop/unit/ns2/patternTopicsAutoConsumerPubSub1"; + std::string topicName2 = "persistent://prop/unit/ns2/patternTopicsAutoConsumerPubSub2"; + std::string topicName3 = "persistent://prop/unit/ns2/patternTopicsAutoConsumerPubSub3"; + // This will not match pattern + std::string topicName4 = "persistent://prop/unit/ns2/patternMultiTopicsNotMatchPubSub4"; + + // call admin api to make topics partitioned + std::string url1 = + adminUrl + "admin/persistent/prop/unit/ns2/patternTopicsAutoConsumerPubSub1/partitions"; + std::string url2 = + adminUrl + "admin/persistent/prop/unit/ns2/patternTopicsAutoConsumerPubSub2/partitions"; + std::string url3 = + adminUrl + "admin/persistent/prop/unit/ns2/patternTopicsAutoConsumerPubSub3/partitions"; + std::string url4 = + adminUrl + "admin/persistent/prop/unit/ns2/patternMultiTopicsNotMatchPubSub4/partitions"; + + int res = makePutRequest(url1, "2"); + ASSERT_FALSE(res != 204 && res != 409); + res = makePutRequest(url2, "3"); + ASSERT_FALSE(res != 204 && res != 409); + res = makePutRequest(url3, "4"); + ASSERT_FALSE(res != 204 && res != 409); + res = makePutRequest(url4, "4"); + ASSERT_FALSE(res != 204 && res != 409); + + Producer producer1; + result = client.createProducer(topicName1, producer1); + ASSERT_EQ(ResultOk, result); + Producer producer2; + result = client.createProducer(topicName2, producer2); + ASSERT_EQ(ResultOk, result); + Producer producer3; + result = client.createProducer(topicName3, producer3); + ASSERT_EQ(ResultOk, result); + Producer producer4; + result = client.createProducer(topicName4, producer4); + ASSERT_EQ(ResultOk, result); + LOG_INFO("created 3 producers that match, with partitions: 2, 3, 4, and 1 producer not match"); + + // 3. wait enough time to trigger auto discovery + usleep(2 * 1000 * 1000); + + // 4. produce data. + int messageNumber = 100; + std::string msgContent = "msg-content"; + LOG_INFO("Publishing 100 messages by producer 1 synchronously"); + for (int msgNum = 0; msgNum < messageNumber; msgNum++) { + std::stringstream stream; + stream << msgContent << msgNum; + Message msg = MessageBuilder().setContent(stream.str()).build(); + ASSERT_EQ(ResultOk, producer1.send(msg)); + } + + msgContent = "msg-content2"; + LOG_INFO("Publishing 100 messages by producer 2 synchronously"); + for (int msgNum = 0; msgNum < messageNumber; msgNum++) { + std::stringstream stream; + stream << msgContent << msgNum; + Message msg = MessageBuilder().setContent(stream.str()).build(); + ASSERT_EQ(ResultOk, producer2.send(msg)); + } + + msgContent = "msg-content3"; + LOG_INFO("Publishing 100 messages by producer 3 synchronously"); + for (int msgNum = 0; msgNum < messageNumber; msgNum++) { + std::stringstream stream; + stream << msgContent << msgNum; + Message msg = MessageBuilder().setContent(stream.str()).build(); + ASSERT_EQ(ResultOk, producer3.send(msg)); + } + + msgContent = "msg-content4"; + LOG_INFO("Publishing 100 messages by producer 4 synchronously"); + for (int msgNum = 0; msgNum < messageNumber; msgNum++) { + std::stringstream stream; + stream << msgContent << msgNum; + Message msg = MessageBuilder().setContent(stream.str()).build(); + ASSERT_EQ(ResultOk, producer4.send(msg)); + } + + // 5. pattern consumer already subscribed 3 topics + LOG_INFO("Consuming and acking 300 messages by pattern topics consumer"); + for (int i = 0; i < 3 * messageNumber; i++) { + Message m; + ASSERT_EQ(ResultOk, consumer.receive(m, 1000)); + ASSERT_EQ(ResultOk, consumer.acknowledge(m)); + } + LOG_INFO("Consumed and acked 300 messages by pattern topics consumer"); + + // verify no more to receive, because producer4 not match pattern + Message m; + ASSERT_EQ(ResultTimeout, consumer.receive(m, 1000)); + + ASSERT_EQ(ResultOk, consumer.unsubscribe()); + + client.shutdown(); +} \ No newline at end of file diff --git a/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc b/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc index 67e6af53e9..f706eb8575 100644 --- a/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc +++ b/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc @@ -25,9 +25,12 @@ #include <Future.h> #include <Utils.h> #include "ConnectionPool.h" +#include "HttpHelper.h" #include <pulsar/Authentication.h> #include <boost/exception/all.hpp> +DECLARE_LOG_OBJECT() + using namespace pulsar; TEST(BinaryLookupServiceTest, basicLookup) { @@ -56,3 +59,67 @@ TEST(BinaryLookupServiceTest, basicLookup) { ASSERT_TRUE(lookupData != NULL); ASSERT_EQ(url, lookupData->getBrokerUrl()); } + +TEST(BinaryLookupServiceTest, basicGetNamespaceTopics) { + std::string url = "pulsar://localhost:8885"; + std::string adminUrl = "http://localhost:8765/"; + Result result; + // 1. create some topics under same namespace + Client client(url); + + std::string topicName1 = "persistent://prop/unit/ns4/basicGetNamespaceTopics1"; + std::string topicName2 = "persistent://prop/unit/ns4/basicGetNamespaceTopics2"; + std::string topicName3 = "persistent://prop/unit/ns4/basicGetNamespaceTopics3"; + // This is not in same namespace. + std::string topicName4 = "persistent://prop/unit/ns2/basicGetNamespaceTopics4"; + + // call admin api to make topics partitioned + std::string url1 = adminUrl + "admin/persistent/prop/unit/ns4/basicGetNamespaceTopics1/partitions"; + std::string url2 = adminUrl + "admin/persistent/prop/unit/ns4/basicGetNamespaceTopics2/partitions"; + std::string url3 = adminUrl + "admin/persistent/prop/unit/ns4/basicGetNamespaceTopics3/partitions"; + + int res = makePutRequest(url1, "2"); + ASSERT_FALSE(res != 204 && res != 409); + res = makePutRequest(url2, "3"); + ASSERT_FALSE(res != 204 && res != 409); + res = makePutRequest(url3, "4"); + ASSERT_FALSE(res != 204 && res != 409); + + Producer producer1; + result = client.createProducer(topicName1, producer1); + ASSERT_EQ(ResultOk, result); + Producer producer2; + result = client.createProducer(topicName2, producer2); + ASSERT_EQ(ResultOk, result); + Producer producer3; + result = client.createProducer(topicName3, producer3); + ASSERT_EQ(ResultOk, result); + Producer producer4; + result = client.createProducer(topicName4, producer4); + ASSERT_EQ(ResultOk, result); + + // 2. call getTopicsOfNamespaceAsync + ExecutorServiceProviderPtr service = boost::make_shared<ExecutorServiceProvider>(1); + AuthenticationPtr authData = AuthFactory::Disabled(); + ClientConfiguration conf; + ExecutorServiceProviderPtr ioExecutorProvider_(boost::make_shared<ExecutorServiceProvider>(1)); + ConnectionPool pool_(conf, ioExecutorProvider_, authData, true); + BinaryProtoLookupService lookupService(pool_, url); + + TopicNamePtr topicName = TopicName::get(topicName1); + NamespaceNamePtr nsName = topicName->getNamespaceName(); + + Future<Result, NamespaceTopicsPtr> getTopicsFuture = lookupService.getTopicsOfNamespaceAsync(nsName); + NamespaceTopicsPtr topicsData; + result = getTopicsFuture.get(topicsData); + ASSERT_EQ(ResultOk, result); + ASSERT_TRUE(topicsData != NULL); + + // 3. verify result contains first 3 topic + ASSERT_EQ(topicsData->size(), 3); + ASSERT_TRUE(std::find(topicsData->begin(), topicsData->end(), topicName1) != topicsData->end()); + ASSERT_TRUE(std::find(topicsData->begin(), topicsData->end(), topicName2) != topicsData->end()); + ASSERT_TRUE(std::find(topicsData->begin(), topicsData->end(), topicName3) != topicsData->end()); + + client.shutdown(); +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services