This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 54e529a  Fix ack failure on message listener in multi topics consumer 
(#447)
54e529a is described below

commit 54e529aaf82bddac063c847d4c11d3fba3acf0f3
Author: nkurihar <[email protected]>
AuthorDate: Fri Oct 11 15:44:23 2024 +0900

    Fix ack failure on message listener in multi topics consumer (#447)
---
 lib/ConsumerImpl.cc            | 14 ++++----------
 lib/MultiTopicsConsumerImpl.cc | 10 ++++++++++
 2 files changed, 14 insertions(+), 10 deletions(-)

diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index e5df421..289bd34 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -292,11 +292,7 @@ void ConsumerImpl::sendFlowPermitsToBroker(const 
ClientConnectionPtr& cnx, int n
 Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, 
Result result) {
     Result handleResult = ResultOk;
 
-    static bool firstTime = true;
     if (result == ResultOk) {
-        if (firstTime) {
-            firstTime = false;
-        }
         LOG_INFO(getName() << "Created consumer on broker " << 
cnx->cnxString());
         {
             Lock lock(mutex_);
@@ -313,12 +309,10 @@ Result ConsumerImpl::handleCreateConsumer(const 
ClientConnectionPtr& cnx, Result
         }
 
         LOG_DEBUG(getName() << "Send initial flow permits: " << 
config_.getReceiverQueueSize());
-        if (consumerTopicType_ == NonPartitioned || !firstTime) {
-            if (config_.getReceiverQueueSize() != 0) {
-                sendFlowPermitsToBroker(cnx, config_.getReceiverQueueSize());
-            } else if (messageListener_) {
-                sendFlowPermitsToBroker(cnx, 1);
-            }
+        if (config_.getReceiverQueueSize() != 0) {
+            sendFlowPermitsToBroker(cnx, config_.getReceiverQueueSize());
+        } else if (messageListener_) {
+            sendFlowPermitsToBroker(cnx, 1);
         }
         consumerCreatedPromise_.setValue(get_shared_this_ptr());
     } else {
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index e95a9ac..dddade5 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -149,6 +149,11 @@ void 
MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer c
         if (state_.compare_exchange_strong(state, Ready)) {
             LOG_INFO("Successfully Subscribed to Topics");
             multiTopicsConsumerCreatedPromise_.setValue(get_shared_this_ptr());
+            // Now all child topics are successfully subscribed, start 
messageListeners
+            if (messageListener_ && !conf_.isStartPaused()) {
+                LOG_INFO("Start messageListeners");
+                resumeMessageListener();
+            }
         } else {
             LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " 
Error - " << result);
             // unsubscribed all of the successfully subscribed partitioned 
consumers
@@ -205,6 +210,11 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int 
numPartitions, TopicN
                                                        
ConsumerSubResultPromisePtr topicSubResultPromise) {
     std::shared_ptr<ConsumerImpl> consumer;
     ConsumerConfiguration config = conf_.clone();
+    // Pause messageListener until all child topics are subscribed.
+    // Otherwise messages may be acked before the parent consumer gets 
"Ready", causing ack failures.
+    if (messageListener_) {
+        config.setStartPaused(true);
+    }
     auto client = client_.lock();
     if (!client) {
         topicSubResultPromise->setFailed(ResultAlreadyClosed);

Reply via email to