This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a994d5daf16c5504db6658c37ea5d1b3043a88c6
Author: Yunze Xu <xyzinfern...@gmail.com>
AuthorDate: Thu Jun 18 02:07:59 2020 +0800

    Fix partition index error in close callback (#7282)
    
    
    (cherry picked from commit 72285f27755f961b61eb4d1eca891a6979044f50)
---
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc | 14 +++++++-------
 pulsar-client-cpp/lib/PartitionedProducerImpl.cc | 16 ++++++++--------
 pulsar-client-cpp/lib/ProducerImpl.h             |  2 ++
 3 files changed, 17 insertions(+), 15 deletions(-)

diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc 
b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 88a2f0a..7f6d127 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -291,7 +291,7 @@ void 
PartitionedConsumerImpl::handleSinglePartitionConsumerCreated(
         partitionedConsumerCreatedPromise_.setFailed(result);
         // unsubscribed all of the successfully subscribed partitioned 
consumers
         closeAsync(nullCallbackForCleanup);
-        LOG_DEBUG("Unable to create Consumer for partition - " << 
partitionIndex << " Error - " << result);
+        LOG_ERROR("Unable to create Consumer for partition - " << 
partitionIndex << " Error - " << result);
         return;
     }
 
@@ -350,17 +350,17 @@ void PartitionedConsumerImpl::closeAsync(ResultCallback 
callback) {
         return;
     }
     setState(Closed);
-    int consumerIndex = 0;
     unsigned int consumerAlreadyClosed = 0;
     // close successfully subscribed consumers
     // Here we don't need `consumersMutex` to protect `consumers_`, because 
`consumers_` can only be increased
     // when `state_` is Ready
-    for (ConsumerList::const_iterator i = consumers_.begin(); i != 
consumers_.end(); i++) {
-        ConsumerImplPtr consumer = *i;
+    for (auto& consumer : consumers_) {
         if (!consumer->isClosed()) {
-            
consumer->closeAsync(std::bind(&PartitionedConsumerImpl::handleSinglePartitionConsumerClose,
-                                           shared_from_this(), 
std::placeholders::_1, consumerIndex,
-                                           callback));
+            auto self = shared_from_this();
+            const auto partition = consumer->getPartitionIndex();
+            consumer->closeAsync([this, self, partition, callback](Result 
result) {
+                handleSinglePartitionConsumerClose(result, partition, 
callback);
+            });
         } else {
             if (++consumerAlreadyClosed == consumers_.size()) {
                 // everything is closed already. so we are good.
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc 
b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index 628afbc..aa5e176 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -131,7 +131,7 @@ void 
PartitionedProducerImpl::handleSinglePartitionProducerCreated(Result result
         lock.unlock();
         closeAsync(closeCallback);
         partitionedProducerCreatedPromise_.setFailed(result);
-        LOG_DEBUG("Unable to create Producer for partition - " << 
partitionIndex << " Error - " << result);
+        LOG_ERROR("Unable to create Producer for partition - " << 
partitionIndex << " Error - " << result);
         return;
     }
 
@@ -204,17 +204,17 @@ int64_t PartitionedProducerImpl::getLastSequenceId() 
const {
 void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) {
     setState(Closing);
 
-    int producerIndex = 0;
     unsigned int producerAlreadyClosed = 0;
 
     // Here we don't need `producersMutex` to protect `producers_`, because 
`producers_` can only be increased
     // when `state_` is Ready
-    for (ProducerList::const_iterator i = producers_.begin(); i != 
producers_.end(); i++) {
-        ProducerImplPtr prod = *i;
-        if (!prod->isClosed()) {
-            
prod->closeAsync(std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerClose,
-                                       shared_from_this(), 
std::placeholders::_1, producerIndex,
-                                       closeCallback));
+    for (auto& producer : producers_) {
+        if (!producer->isClosed()) {
+            auto self = shared_from_this();
+            const auto partition = static_cast<unsigned 
int>(producer->partition());
+            producer->closeAsync([this, self, partition, closeCallback](Result 
result) {
+                handleSinglePartitionProducerClose(result, partition, 
closeCallback);
+            });
         } else {
             producerAlreadyClosed++;
         }
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h 
b/pulsar-client-cpp/lib/ProducerImpl.h
index 0a57da8d..25f628c 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -90,6 +90,8 @@ class ProducerImpl : public HandlerBase,
 
     uint64_t getProducerId() const;
 
+    int32_t partition() const noexcept { return partition_; }
+
     virtual void start();
 
     virtual void shutdown();

Reply via email to