This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 54e529a Fix ack failure on message listener in multi topics consumer
(#447)
54e529a is described below
commit 54e529aaf82bddac063c847d4c11d3fba3acf0f3
Author: nkurihar <[email protected]>
AuthorDate: Fri Oct 11 15:44:23 2024 +0900
Fix ack failure on message listener in multi topics consumer (#447)
---
lib/ConsumerImpl.cc | 14 ++++----------
lib/MultiTopicsConsumerImpl.cc | 10 ++++++++++
2 files changed, 14 insertions(+), 10 deletions(-)
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index e5df421..289bd34 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -292,11 +292,7 @@ void ConsumerImpl::sendFlowPermitsToBroker(const
ClientConnectionPtr& cnx, int n
Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx,
Result result) {
Result handleResult = ResultOk;
- static bool firstTime = true;
if (result == ResultOk) {
- if (firstTime) {
- firstTime = false;
- }
LOG_INFO(getName() << "Created consumer on broker " <<
cnx->cnxString());
{
Lock lock(mutex_);
@@ -313,12 +309,10 @@ Result ConsumerImpl::handleCreateConsumer(const
ClientConnectionPtr& cnx, Result
}
LOG_DEBUG(getName() << "Send initial flow permits: " <<
config_.getReceiverQueueSize());
- if (consumerTopicType_ == NonPartitioned || !firstTime) {
- if (config_.getReceiverQueueSize() != 0) {
- sendFlowPermitsToBroker(cnx, config_.getReceiverQueueSize());
- } else if (messageListener_) {
- sendFlowPermitsToBroker(cnx, 1);
- }
+ if (config_.getReceiverQueueSize() != 0) {
+ sendFlowPermitsToBroker(cnx, config_.getReceiverQueueSize());
+ } else if (messageListener_) {
+ sendFlowPermitsToBroker(cnx, 1);
}
consumerCreatedPromise_.setValue(get_shared_this_ptr());
} else {
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index e95a9ac..dddade5 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -149,6 +149,11 @@ void
MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer c
if (state_.compare_exchange_strong(state, Ready)) {
LOG_INFO("Successfully Subscribed to Topics");
multiTopicsConsumerCreatedPromise_.setValue(get_shared_this_ptr());
+ // Now all child topics are successfully subscribed, start
messageListeners
+ if (messageListener_ && !conf_.isStartPaused()) {
+ LOG_INFO("Start messageListeners");
+ resumeMessageListener();
+ }
} else {
LOG_ERROR("Unable to create Consumer - " << consumerStr_ << "
Error - " << result);
// unsubscribed all of the successfully subscribed partitioned
consumers
@@ -205,6 +210,11 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int
numPartitions, TopicN
ConsumerSubResultPromisePtr topicSubResultPromise) {
std::shared_ptr<ConsumerImpl> consumer;
ConsumerConfiguration config = conf_.clone();
+ // Pause messageListener until all child topics are subscribed.
+ // Otherwise messages may be acked before the parent consumer gets
"Ready", causing ack failures.
+ if (messageListener_) {
+ config.setStartPaused(true);
+ }
auto client = client_.lock();
if (!client) {
topicSubResultPromise->setFailed(ResultAlreadyClosed);