This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a994d5daf16c5504db6658c37ea5d1b3043a88c6 Author: Yunze Xu <xyzinfern...@gmail.com> AuthorDate: Thu Jun 18 02:07:59 2020 +0800 Fix partition index error in close callback (#7282) (cherry picked from commit 72285f27755f961b61eb4d1eca891a6979044f50) --- pulsar-client-cpp/lib/PartitionedConsumerImpl.cc | 14 +++++++------- pulsar-client-cpp/lib/PartitionedProducerImpl.cc | 16 ++++++++-------- pulsar-client-cpp/lib/ProducerImpl.h | 2 ++ 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc index 88a2f0a..7f6d127 100644 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc @@ -291,7 +291,7 @@ void PartitionedConsumerImpl::handleSinglePartitionConsumerCreated( partitionedConsumerCreatedPromise_.setFailed(result); // unsubscribed all of the successfully subscribed partitioned consumers closeAsync(nullCallbackForCleanup); - LOG_DEBUG("Unable to create Consumer for partition - " << partitionIndex << " Error - " << result); + LOG_ERROR("Unable to create Consumer for partition - " << partitionIndex << " Error - " << result); return; } @@ -350,17 +350,17 @@ void PartitionedConsumerImpl::closeAsync(ResultCallback callback) { return; } setState(Closed); - int consumerIndex = 0; unsigned int consumerAlreadyClosed = 0; // close successfully subscribed consumers // Here we don't need `consumersMutex` to protect `consumers_`, because `consumers_` can only be increased // when `state_` is Ready - for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) { - ConsumerImplPtr consumer = *i; + for (auto& consumer : consumers_) { if (!consumer->isClosed()) { - consumer->closeAsync(std::bind(&PartitionedConsumerImpl::handleSinglePartitionConsumerClose, - shared_from_this(), std::placeholders::_1, consumerIndex, - callback)); + auto self = shared_from_this(); + const auto partition = consumer->getPartitionIndex(); + consumer->closeAsync([this, self, partition, callback](Result result) { + handleSinglePartitionConsumerClose(result, partition, callback); + }); } else { if (++consumerAlreadyClosed == consumers_.size()) { // everything is closed already. so we are good. diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc index 628afbc..aa5e176 100644 --- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc @@ -131,7 +131,7 @@ void PartitionedProducerImpl::handleSinglePartitionProducerCreated(Result result lock.unlock(); closeAsync(closeCallback); partitionedProducerCreatedPromise_.setFailed(result); - LOG_DEBUG("Unable to create Producer for partition - " << partitionIndex << " Error - " << result); + LOG_ERROR("Unable to create Producer for partition - " << partitionIndex << " Error - " << result); return; } @@ -204,17 +204,17 @@ int64_t PartitionedProducerImpl::getLastSequenceId() const { void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) { setState(Closing); - int producerIndex = 0; unsigned int producerAlreadyClosed = 0; // Here we don't need `producersMutex` to protect `producers_`, because `producers_` can only be increased // when `state_` is Ready - for (ProducerList::const_iterator i = producers_.begin(); i != producers_.end(); i++) { - ProducerImplPtr prod = *i; - if (!prod->isClosed()) { - prod->closeAsync(std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerClose, - shared_from_this(), std::placeholders::_1, producerIndex, - closeCallback)); + for (auto& producer : producers_) { + if (!producer->isClosed()) { + auto self = shared_from_this(); + const auto partition = static_cast<unsigned int>(producer->partition()); + producer->closeAsync([this, self, partition, closeCallback](Result result) { + handleSinglePartitionProducerClose(result, partition, closeCallback); + }); } else { producerAlreadyClosed++; } diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h index 0a57da8d..25f628c 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.h +++ b/pulsar-client-cpp/lib/ProducerImpl.h @@ -90,6 +90,8 @@ class ProducerImpl : public HandlerBase, uint64_t getProducerId() const; + int32_t partition() const noexcept { return partition_; } + virtual void start(); virtual void shutdown();