This is an automated email from the ASF dual-hosted git repository. zhaijia pushed a commit to branch branch-2.5 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 74b3564a7060041c0091ed31a794a8d190330614 Author: k2la <[email protected]> AuthorDate: Tue Mar 3 04:55:33 2020 +0900 [Issue 6168] Fix Unacked Message Tracker by Using Time Partition on C++ (#6391) ### Motivation Fix #6168 . >On C++ lib, like the following log, unacked messages are redelivered after about 2 * unAckedMessagesTimeout. ### Modifications As same #3118, by using TimePartition, fixed ` UnackedMessageTracker` . - Add `TickDurationInMs` - Add `redeliverUnacknowledgedMessages` which require `MessageIds` to `ConsumerImpl`, `MultiTopicsConsumerImpl` and `PartitionedConsumerImpl`. (cherry picked from commit 333888ad61c062f9e3d2946918ffc21fafd441af) --- .../include/pulsar/ConsumerConfiguration.h | 4 + .../include/pulsar/c/producer_configuration.h | 2 +- pulsar-client-cpp/lib/ConsumerConfiguration.cc | 6 ++ pulsar-client-cpp/lib/ConsumerConfigurationImpl.h | 2 + pulsar-client-cpp/lib/ConsumerImpl.cc | 21 +++- pulsar-client-cpp/lib/ConsumerImpl.h | 1 + pulsar-client-cpp/lib/ConsumerImplBase.h | 1 + pulsar-client-cpp/lib/LogUtils.cc | 2 +- pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 25 ++++- pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h | 1 + pulsar-client-cpp/lib/PartitionedConsumerImpl.cc | 24 ++++- pulsar-client-cpp/lib/PartitionedConsumerImpl.h | 1 + .../lib/UnAckedMessageTrackerEnabled.cc | 106 +++++++++++++-------- .../lib/UnAckedMessageTrackerEnabled.h | 6 +- 14 files changed, 152 insertions(+), 50 deletions(-) diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h index 08b7c54..5468b37 100644 --- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h +++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h @@ -155,6 +155,10 @@ class PULSAR_PUBLIC ConsumerConfiguration { */ long getUnAckedMessagesTimeoutMs() const; + void setTickDurationInMs(const uint64_t milliSeconds); + + long getTickDurationInMs() const; + /** * Set the delay to wait before re-delivering messages that have failed to be process. * <p> diff --git a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h index c846451..1fe44e0 100644 --- a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h +++ b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h @@ -178,4 +178,4 @@ PULSAR_PUBLIC void pulsar_producer_configuration_set_property(pulsar_producer_co #ifdef __cplusplus } -#endif \ No newline at end of file +#endif diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc b/pulsar-client-cpp/lib/ConsumerConfiguration.cc index 38fa1fe..546b8b9 100644 --- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc +++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc @@ -98,6 +98,12 @@ void ConsumerConfiguration::setUnAckedMessagesTimeoutMs(const uint64_t milliSeco impl_->unAckedMessagesTimeoutMs = milliSeconds; } +long ConsumerConfiguration::getTickDurationInMs() const { return impl_->tickDurationInMs; } + +void ConsumerConfiguration::setTickDurationInMs(const uint64_t milliSeconds) { + impl_->tickDurationInMs = milliSeconds; +} + void ConsumerConfiguration::setNegativeAckRedeliveryDelayMs(long redeliveryDelayMillis) { impl_->negativeAckRedeliveryDelayMs = redeliveryDelayMillis; } diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h index 55dafd3..8dd1263 100644 --- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h +++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h @@ -27,6 +27,7 @@ namespace pulsar { struct ConsumerConfigurationImpl { SchemaInfo schemaInfo; long unAckedMessagesTimeoutMs; + long tickDurationInMs; long negativeAckRedeliveryDelayMs; ConsumerType consumerType; @@ -45,6 +46,7 @@ struct ConsumerConfigurationImpl { ConsumerConfigurationImpl() : schemaInfo(), unAckedMessagesTimeoutMs(0), + tickDurationInMs(1000), negativeAckRedeliveryDelayMs(60000), consumerType(ConsumerExclusive), messageListener(), diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc index 9a7e506..a58e09b 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.cc +++ b/pulsar-client-cpp/lib/ConsumerImpl.cc @@ -66,8 +66,13 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, consumerStrStream << "[" << topic_ << ", " << subscription_ << ", " << consumerId_ << "] "; consumerStr_ = consumerStrStream.str(); if (conf.getUnAckedMessagesTimeoutMs() != 0) { - unAckedMessageTrackerPtr_.reset( - new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this)); + if (conf.getTickDurationInMs() > 0) { + unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled( + conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this)); + } else { + unAckedMessageTrackerPtr_.reset( + new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this)); + } } else { unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled()); } @@ -954,6 +959,18 @@ Result ConsumerImpl::resumeMessageListener() { void ConsumerImpl::redeliverUnacknowledgedMessages() { static std::set<MessageId> emptySet; redeliverMessages(emptySet); + unAckedMessageTrackerPtr_->clear(); +} + +void ConsumerImpl::redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) { + if (messageIds.empty()) { + return; + } + if (config_.getConsumerType() != ConsumerShared && config_.getConsumerType() != ConsumerKeyShared) { + redeliverUnacknowledgedMessages(); + return; + } + redeliverMessages(messageIds); } void ConsumerImpl::redeliverMessages(const std::set<MessageId>& messageIds) { diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h index 8a25b49..6d81fd0 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.h +++ b/pulsar-client-cpp/lib/ConsumerImpl.h @@ -98,6 +98,7 @@ class ConsumerImpl : public ConsumerImplBase, virtual bool isCumulativeAcknowledgementAllowed(ConsumerType consumerType); virtual void redeliverMessages(const std::set<MessageId>& messageIds); + virtual void redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds); virtual void negativeAcknowledge(const MessageId& msgId); virtual void closeAsync(ResultCallback callback); diff --git a/pulsar-client-cpp/lib/ConsumerImplBase.h b/pulsar-client-cpp/lib/ConsumerImplBase.h index ab6ed9f..fc15066 100644 --- a/pulsar-client-cpp/lib/ConsumerImplBase.h +++ b/pulsar-client-cpp/lib/ConsumerImplBase.h @@ -48,6 +48,7 @@ class ConsumerImplBase { virtual Result pauseMessageListener() = 0; virtual Result resumeMessageListener() = 0; virtual void redeliverUnacknowledgedMessages() = 0; + virtual void redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) = 0; virtual const std::string& getName() const = 0; virtual int getNumOfPrefetchedMessages() const = 0; virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) = 0; diff --git a/pulsar-client-cpp/lib/LogUtils.cc b/pulsar-client-cpp/lib/LogUtils.cc index e2615a5..e4f6a17 100644 --- a/pulsar-client-cpp/lib/LogUtils.cc +++ b/pulsar-client-cpp/lib/LogUtils.cc @@ -55,4 +55,4 @@ std::string LogUtils::getLoggerName(const std::string& path) { return path.substr(startIdx + 1, endIdx - startIdx - 1); } -} // namespace pulsar \ No newline at end of file +} // namespace pulsar diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc index 25addb9..e837adf 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc @@ -45,8 +45,13 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std consumerStr_ = consumerStrStream.str(); if (conf.getUnAckedMessagesTimeoutMs() != 0) { - unAckedMessageTrackerPtr_.reset( - new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this)); + if (conf.getTickDurationInMs() > 0) { + unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled( + conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this)); + } else { + unAckedMessageTrackerPtr_.reset( + new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this)); + } } else { unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled()); } @@ -654,6 +659,22 @@ void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages() { consumer++) { (consumer->second)->redeliverUnacknowledgedMessages(); } + unAckedMessageTrackerPtr_->clear(); +} + +void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) { + if (messageIds.empty()) { + return; + } + if (conf_.getConsumerType() != ConsumerShared && conf_.getConsumerType() != ConsumerKeyShared) { + redeliverUnacknowledgedMessages(); + return; + } + LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for partitioned consumer."); + for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end(); + consumer++) { + (consumer->second)->redeliverUnacknowledgedMessages(messageIds); + } } int MultiTopicsConsumerImpl::getNumOfPrefetchedMessages() const { return messages_.size(); } diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h index d190664..fa271fe 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h @@ -69,6 +69,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase, virtual Result pauseMessageListener(); virtual Result resumeMessageListener(); virtual void redeliverUnacknowledgedMessages(); + virtual void redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds); virtual int getNumOfPrefetchedMessages() const; virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback); void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, MultiTopicsBrokerConsumerStatsPtr, diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc index 03fd2d2..0241a54 100644 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc @@ -43,8 +43,13 @@ PartitionedConsumerImpl::PartitionedConsumerImpl(ClientImplPtr client, const std consumerStrStream << "[Partitioned Consumer: " << topic_ << "," << subscriptionName << "," << numPartitions << "]"; if (conf.getUnAckedMessagesTimeoutMs() != 0) { - unAckedMessageTrackerPtr_.reset( - new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this)); + if (conf.getTickDurationInMs() > 0) { + unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled( + conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this)); + } else { + unAckedMessageTrackerPtr_.reset( + new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this)); + } } else { unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled()); } @@ -426,6 +431,21 @@ void PartitionedConsumerImpl::redeliverUnacknowledgedMessages() { for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) { (*i)->redeliverUnacknowledgedMessages(); } + unAckedMessageTrackerPtr_->clear(); +} + +void PartitionedConsumerImpl::redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) { + if (messageIds.empty()) { + return; + } + if (conf_.getConsumerType() != ConsumerShared && conf_.getConsumerType() != ConsumerKeyShared) { + redeliverUnacknowledgedMessages(); + return; + } + LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for partitioned consumer."); + for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) { + (*i)->redeliverUnacknowledgedMessages(messageIds); + } } const std::string& PartitionedConsumerImpl::getName() const { return partitionStr_; } diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h index 71b2a30..fb4b047 100644 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h +++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h @@ -64,6 +64,7 @@ class PartitionedConsumerImpl : public ConsumerImplBase, virtual Result pauseMessageListener(); virtual Result resumeMessageListener(); virtual void redeliverUnacknowledgedMessages(); + virtual void redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds); virtual const std::string& getName() const; virtual int getNumOfPrefetchedMessages() const; virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback); diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc index 2c768b2..7894e64 100644 --- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc +++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc @@ -28,7 +28,7 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() { timeoutHandlerHelper(); ExecutorServicePtr executorService = client_->getIOExecutorProvider()->get(); timer_ = executorService->createDeadlineTimer(); - timer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs_)); + timer_->expires_from_now(boost::posix_time::milliseconds(tickDurationInMs_)); timer_->async_wait([&](const boost::system::error_code& ec) { if (ec) { LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]"); @@ -42,86 +42,112 @@ void UnAckedMessageTrackerEnabled::timeoutHandlerHelper() { std::lock_guard<std::mutex> acquire(lock_); LOG_DEBUG("UnAckedMessageTrackerEnabled::timeoutHandlerHelper invoked for consumerPtr_ " << consumerReference_.getName().c_str()); - if (!oldSet_.empty()) { + + std::set<MessageId> headPartition = timePartitions.front(); + timePartitions.pop_front(); + + std::set<MessageId> msgIdsToRedeliver; + if (!headPartition.empty()) { LOG_INFO(consumerReference_.getName().c_str() - << ": " << oldSet_.size() << " Messages were not acked within " << timeoutMs_ << " time"); - oldSet_.clear(); - currentSet_.clear(); - consumerReference_.redeliverUnacknowledgedMessages(); + << ": " << headPartition.size() << " Messages were not acked within " + << timePartitions.size() * tickDurationInMs_ << " time"); + for (auto it = headPartition.begin(); it != headPartition.end(); it++) { + msgIdsToRedeliver.insert(*it); + messageIdPartitionMap.erase(*it); + } + } + headPartition.clear(); + timePartitions.push_back(headPartition); + + if (msgIdsToRedeliver.size() > 0) { + consumerReference_.redeliverUnacknowledgedMessages(msgIdsToRedeliver); } - oldSet_.swap(currentSet_); } UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, const ClientImplPtr client, ConsumerImplBase& consumer) : consumerReference_(consumer) { + UnAckedMessageTrackerEnabled(timeoutMs, timeoutMs, client, consumer); +} + +UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, long tickDurationInMs, + const ClientImplPtr client, + ConsumerImplBase& consumer) + : consumerReference_(consumer) { timeoutMs_ = timeoutMs; + tickDurationInMs_ = (timeoutMs >= tickDurationInMs) ? tickDurationInMs : timeoutMs; client_ = client; + + int blankPartitions = (int)std::ceil((double)timeoutMs_ / tickDurationInMs_); + for (int i = 0; i < blankPartitions + 1; i++) { + std::set<MessageId> msgIds; + timePartitions.push_back(msgIds); + } + timeoutHandler(); } bool UnAckedMessageTrackerEnabled::add(const MessageId& m) { std::lock_guard<std::mutex> acquire(lock_); - oldSet_.erase(m); - return currentSet_.insert(m).second; + if (messageIdPartitionMap.count(m) == 0) { + bool insert = messageIdPartitionMap.insert(std::make_pair(m, timePartitions.back())).second; + return insert && timePartitions.back().insert(m).second; + } + return false; } bool UnAckedMessageTrackerEnabled::isEmpty() { std::lock_guard<std::mutex> acquire(lock_); - return oldSet_.empty() && currentSet_.empty(); + return messageIdPartitionMap.empty(); } bool UnAckedMessageTrackerEnabled::remove(const MessageId& m) { std::lock_guard<std::mutex> acquire(lock_); - return oldSet_.erase(m) || currentSet_.erase(m); + bool removed = false; + std::map<MessageId, std::set<MessageId>>::iterator exist = messageIdPartitionMap.find(m); + if (exist != messageIdPartitionMap.end()) { + removed = exist->second.erase(m); + } + return removed; } long UnAckedMessageTrackerEnabled::size() { std::lock_guard<std::mutex> acquire(lock_); - return oldSet_.size() + currentSet_.size(); + return messageIdPartitionMap.size(); } void UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) { std::lock_guard<std::mutex> acquire(lock_); - for (std::set<MessageId>::iterator it = oldSet_.begin(); it != oldSet_.end();) { - if (*it < msgId && it->partition() == msgId.partition()) { - oldSet_.erase(it++); - } else { - it++; - } - } - for (std::set<MessageId>::iterator it = currentSet_.begin(); it != currentSet_.end();) { - if (*it < msgId && it->partition() == msgId.partition()) { - currentSet_.erase(it++); - } else { - it++; + for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end(); it++) { + MessageId msgIdInMap = it->first; + if (msgIdInMap < msgId) { + std::map<MessageId, std::set<MessageId>>::iterator exist = messageIdPartitionMap.find(msgId); + if (exist != messageIdPartitionMap.end()) { + exist->second.erase(msgId); + } } } } // this is only for MultiTopicsConsumerImpl, when un-subscribe a single topic, should remove all it's message. void UnAckedMessageTrackerEnabled::removeTopicMessage(const std::string& topic) { - for (std::set<MessageId>::iterator it = oldSet_.begin(); it != oldSet_.end();) { - const std::string& topicPartitionName = it->getTopicName(); - if (topicPartitionName.find(topic) != std::string::npos) { - oldSet_.erase(it++); - } else { - it++; - } - } - for (std::set<MessageId>::iterator it = currentSet_.begin(); it != currentSet_.end();) { - const std::string& topicPartitionName = it->getTopicName(); - if (topicPartitionName.find(topic) != std::string::npos) { - currentSet_.erase(it++); - } else { - it++; + std::lock_guard<std::mutex> acquire(lock_); + for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end(); it++) { + MessageId msgIdInMap = it->first; + if (msgIdInMap.getTopicName().compare(topic) == 0) { + std::map<MessageId, std::set<MessageId>>::iterator exist = messageIdPartitionMap.find(msgIdInMap); + if (exist != messageIdPartitionMap.end()) { + exist->second.erase(msgIdInMap); + } } } } void UnAckedMessageTrackerEnabled::clear() { - currentSet_.clear(); - oldSet_.clear(); + messageIdPartitionMap.clear(); + for (auto it = timePartitions.begin(); it != timePartitions.end(); it++) { + it->clear(); + } } UnAckedMessageTrackerEnabled::~UnAckedMessageTrackerEnabled() { diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h index 921e747..c2b4012 100644 --- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h +++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h @@ -28,6 +28,7 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface { public: ~UnAckedMessageTrackerEnabled(); UnAckedMessageTrackerEnabled(long timeoutMs, const ClientImplPtr, ConsumerImplBase&); + UnAckedMessageTrackerEnabled(long timeoutMs, long tickDuration, const ClientImplPtr, ConsumerImplBase&); bool add(const MessageId& m); bool remove(const MessageId& m); void removeMessagesTill(const MessageId& msgId); @@ -40,13 +41,14 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface { void timeoutHandlerHelper(); bool isEmpty(); long size(); - std::set<MessageId> currentSet_; - std::set<MessageId> oldSet_; + std::map<MessageId, std::set<MessageId>> messageIdPartitionMap; + std::deque<std::set<MessageId>> timePartitions; std::mutex lock_; DeadlineTimerPtr timer_; ConsumerImplBase& consumerReference_; ClientImplPtr client_; long timeoutMs_; + long tickDurationInMs_; }; } // namespace pulsar
