This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fb6b4ea  [C++] Fix paused zero queue consumer still pre-fetches 
messages (#10036)
fb6b4ea is described below

commit fb6b4ea04dda003f35007c180a1edc22ae619591
Author: Yunze Xu <xyzinfern...@163.com>
AuthorDate: Sat Mar 27 06:48:18 2021 +0800

    [C++] Fix paused zero queue consumer still pre-fetches messages (#10036)
    
    * Fix zero queue consumer pre-fetches messages after paused
    
    * Remove unused logs and checks
    
    * Fix pulsar image build failure
    
    * Fix AtomicHelper functions
    
    * Use fetch_add as a substitute of addAndGet
---
 pulsar-client-cpp/lib/ConsumerImpl.cc            |  57 +++++-------
 pulsar-client-cpp/lib/ConsumerImpl.h             |  12 +--
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc |   2 +-
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc |   2 +-
 pulsar-client-cpp/tests/ZeroQueueSizeTest.cc     | 107 +++++++++++++++++++++++
 5 files changed, 136 insertions(+), 44 deletions(-)

diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc 
b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 47250c8..4d64d24 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -54,7 +54,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const 
std::string& topic,
       // This is the initial capacity of the queue
       incomingMessages_(std::max(config_.getReceiverQueueSize(), 1)),
       pendingReceives_(),
-      availablePermits_(conf.getReceiverQueueSize()),
+      availablePermits_(0),
+      receiverQueueRefillThreshold_(config_.getReceiverQueueSize() / 2),
       consumerId_(client->newConsumerId()),
       consumerName_(config_.getConsumerName()),
       partitionIndex_(-1),
@@ -202,9 +203,12 @@ void ConsumerImpl::connectionFailed(Result result) {
     }
 }
 
-void ConsumerImpl::receiveMessages(const ClientConnectionPtr& cnx, unsigned 
int count) {
-    SharedBuffer cmd = Commands::newFlow(consumerId_, count);
-    cnx->sendCommand(cmd);
+void ConsumerImpl::sendFlowPermitsToBroker(const ClientConnectionPtr& cnx, int 
numMessages) {
+    if (cnx) {
+        LOG_DEBUG(getName() << "Send more permits: " << numMessages);
+        SharedBuffer cmd = Commands::newFlow(consumerId_, static_cast<unsigned 
int>(numMessages));
+        cnx->sendCommand(cmd);
+    }
 }
 
 void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result 
result) {
@@ -223,7 +227,7 @@ void ConsumerImpl::handleCreateConsumer(const 
ClientConnectionPtr& cnx, Result r
             backoff_.reset();
             // Complicated logic since we don't have a isLocked() function for 
mutex
             if (waitingForZeroQueueSizeMessage) {
-                receiveMessages(cnx, 1);
+                sendFlowPermitsToBroker(cnx, 1);
             }
             availablePermits_ = 0;
         }
@@ -231,9 +235,9 @@ void ConsumerImpl::handleCreateConsumer(const 
ClientConnectionPtr& cnx, Result r
         LOG_DEBUG(getName() << "Send initial flow permits: " << 
config_.getReceiverQueueSize());
         if (consumerTopicType_ == NonPartitioned || !firstTime) {
             if (config_.getReceiverQueueSize() != 0) {
-                receiveMessages(cnx, config_.getReceiverQueueSize());
+                sendFlowPermitsToBroker(cnx, config_.getReceiverQueueSize());
             } else if (messageListener_) {
-                receiveMessages(cnx, 1);
+                sendFlowPermitsToBroker(cnx, 1);
             }
         }
         consumerCreatedPromise_.setValue(shared_from_this());
@@ -380,11 +384,9 @@ void ConsumerImpl::messageReceived(const 
ClientConnectionPtr& cnx, const proto::
     }
 
     if (messageListener_) {
-        Lock lock(messageListenerMutex_);
         if (!messageListenerRunning_) {
             return;
         }
-        lock.unlock();
         // Trigger message listener callback in a separate thread
         while (numOfMessageReceived--) {
             
listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, 
shared_from_this()));
@@ -556,11 +558,9 @@ void ConsumerImpl::discardCorruptedMessage(const 
ClientConnectionPtr& cnx,
 }
 
 void ConsumerImpl::internalListener() {
-    Lock lock(messageListenerMutex_);
     if (!messageListenerRunning_) {
         return;
     }
-    lock.unlock();
     Message msg;
     if (!incomingMessages_.pop(msg, std::chrono::milliseconds(0))) {
         // This will only happen when the connection got reset and we cleared 
the queue
@@ -596,10 +596,7 @@ Result ConsumerImpl::fetchSingleMessageFromBroker(Message& 
msg) {
     waitingForZeroQueueSizeMessage = true;
     localLock.unlock();
 
-    if (currentCnx) {
-        LOG_DEBUG(getName() << "Send more permits: " << 1);
-        receiveMessages(currentCnx, 1);
-    }
+    sendFlowPermitsToBroker(currentCnx, 1);
 
     while (true) {
         incomingMessages_.pop(msg);
@@ -646,11 +643,7 @@ void ConsumerImpl::receiveAsync(ReceiveCallback& callback) 
{
         lock.unlock();
 
         if (config_.getReceiverQueueSize() == 0) {
-            ClientConnectionPtr currentCnx = getCnx().lock();
-            if (currentCnx) {
-                LOG_DEBUG(getName() << "Send more permits: " << 1);
-                receiveMessages(currentCnx, 1);
-            }
+            sendFlowPermitsToBroker(getCnx().lock(), 1);
         }
     }
 }
@@ -752,20 +745,13 @@ Optional<MessageId> ConsumerImpl::clearReceiveQueue() {
     }
 }
 
-void ConsumerImpl::increaseAvailablePermits(const ClientConnectionPtr& 
currentCnx, int numberOfPermits) {
-    int additionalPermits = 0;
+void ConsumerImpl::increaseAvailablePermits(const ClientConnectionPtr& 
currentCnx, int delta) {
+    int newAvailablePermits = availablePermits_.fetch_add(delta) + delta;
 
-    availablePermits_ += numberOfPermits;
-    if (availablePermits_ >= config_.getReceiverQueueSize() / 2) {
-        additionalPermits = availablePermits_;
-        availablePermits_ = 0;
-    }
-    if (additionalPermits > 0) {
-        if (currentCnx) {
-            LOG_DEBUG(getName() << "Send more permits: " << additionalPermits);
-            receiveMessages(currentCnx, additionalPermits);
-        } else {
-            LOG_DEBUG(getName() << "Connection is not ready, Unable to send 
flow Command");
+    while (newAvailablePermits >= receiverQueueRefillThreshold_ && 
messageListenerRunning_) {
+        if (availablePermits_.compare_exchange_weak(newAvailablePermits, 0)) {
+            sendFlowPermitsToBroker(currentCnx, newAvailablePermits);
+            break;
         }
     }
 }
@@ -972,7 +958,6 @@ Result ConsumerImpl::pauseMessageListener() {
     if (!messageListener_) {
         return ResultInvalidConfiguration;
     }
-    Lock lock(messageListenerMutex_);
     messageListenerRunning_ = false;
     return ResultOk;
 }
@@ -982,19 +967,19 @@ Result ConsumerImpl::resumeMessageListener() {
         return ResultInvalidConfiguration;
     }
 
-    Lock lock(messageListenerMutex_);
     if (messageListenerRunning_) {
         // Not paused
         return ResultOk;
     }
     messageListenerRunning_ = true;
     const size_t count = incomingMessages_.size();
-    lock.unlock();
 
     for (size_t i = 0; i < count; i++) {
         // Trigger message listener callback in a separate thread
         listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, 
shared_from_this()));
     }
+    // Check current permits and determine whether to send FLOW command
+    this->increaseAvailablePermits(getCnx().lock(), 0);
     return ResultOk;
 }
 
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h 
b/pulsar-client-cpp/lib/ConsumerImpl.h
index 313f1d9..ea67893 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -43,6 +43,7 @@
 #include <lib/stats/ConsumerStatsImpl.h>
 #include <lib/stats/ConsumerStatsDisabled.h>
 #include <queue>
+#include <atomic>
 
 using namespace pulsar;
 
@@ -73,11 +74,10 @@ class ConsumerImpl : public ConsumerImplBase,
     ~ConsumerImpl();
     void setPartitionIndex(int partitionIndex);
     int getPartitionIndex();
-    void receiveMessages(const ClientConnectionPtr& cnx, unsigned int count);
+    void sendFlowPermitsToBroker(const ClientConnectionPtr& cnx, int 
numMessages);
     uint64_t getConsumerId();
     void messageReceived(const ClientConnectionPtr& cnx, const 
proto::CommandMessage& msg,
                          bool& isChecksumValid, proto::MessageMetadata& 
msgMetadata, SharedBuffer& payload);
-    int incrementAndGetPermits(uint64_t cnxSequenceId);
     void messageProcessed(Message& msg);
     inline proto::CommandSubscribe_SubType getSubType();
     inline proto::CommandSubscribe_InitialPosition getInitialPosition();
@@ -150,7 +150,7 @@ class ConsumerImpl : public ConsumerImplBase,
                                    const proto::MessageMetadata& metadata, 
SharedBuffer& payload);
     void discardCorruptedMessage(const ClientConnectionPtr& cnx, const 
proto::MessageIdData& messageId,
                                  proto::CommandAck::ValidationError 
validationError);
-    void increaseAvailablePermits(const ClientConnectionPtr& currentCnx, int 
numberOfPermits = 1);
+    void increaseAvailablePermits(const ClientConnectionPtr& currentCnx, int 
delta = 1);
     void drainIncomingMessageQueue(size_t count);
     uint32_t receiveIndividualMessagesFromBatch(const ClientConnectionPtr& 
cnx, Message& batchedMessage,
                                                 int redeliveryCount);
@@ -185,14 +185,14 @@ class ConsumerImpl : public ConsumerImplBase,
     Optional<MessageId> lastDequedMessage_;
     UnboundedBlockingQueue<Message> incomingMessages_;
     std::queue<ReceiveCallback> pendingReceives_;
-    int availablePermits_;
+    std::atomic_int availablePermits_;
+    const int receiverQueueRefillThreshold_;
     uint64_t consumerId_;
     std::string consumerName_;
     std::string consumerStr_;
     int32_t partitionIndex_;
     Promise<Result, ConsumerImplBaseWeakPtr> consumerCreatedPromise_;
-    bool messageListenerRunning_;
-    std::mutex messageListenerMutex_;
+    std::atomic_bool messageListenerRunning_;
     CompressionCodecProvider compressionCodecProvider_;
     UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
     BatchAcknowledgementTracker batchAcknowledgementTracker_;
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc 
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index b62083a..46660e7 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -613,7 +613,7 @@ void MultiTopicsConsumerImpl::receiveMessages() {
     for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer 
!= consumers_.end();
          consumer++) {
         ConsumerImplPtr consumerPtr = consumer->second;
-        consumerPtr->receiveMessages(consumerPtr->getCnx().lock(), 
conf_.getReceiverQueueSize());
+        consumerPtr->sendFlowPermitsToBroker(consumerPtr->getCnx().lock(), 
conf_.getReceiverQueueSize());
         LOG_DEBUG("Sending FLOW command for consumer - " << 
consumerPtr->getConsumerId());
     }
 }
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc 
b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 8881a0d..5571a6c 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -454,7 +454,7 @@ void PartitionedConsumerImpl::internalListener(Consumer 
consumer) {
 void PartitionedConsumerImpl::receiveMessages() {
     for (ConsumerList::const_iterator i = consumers_.begin(); i != 
consumers_.end(); i++) {
         ConsumerImplPtr consumer = *i;
-        consumer->receiveMessages(consumer->getCnx().lock(), 
conf_.getReceiverQueueSize());
+        consumer->sendFlowPermitsToBroker(consumer->getCnx().lock(), 
conf_.getReceiverQueueSize());
         LOG_DEBUG("Sending FLOW command for consumer - " << 
consumer->getConsumerId());
     }
 }
diff --git a/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc 
b/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc
index 53b6c9d..6d6c933 100644
--- a/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc
+++ b/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc
@@ -20,7 +20,12 @@
 #include <pulsar/Client.h>
 #include <lib/Latch.h>
 #include "ConsumerTest.h"
+#include <atomic>
+#include <chrono>
+#include <condition_variable>
 #include <functional>
+#include <memory>
+#include <mutex>
 
 DECLARE_LOG_OBJECT()
 
@@ -116,3 +121,105 @@ TEST(ZeroQueueSizeTest, testMessageListener) {
     producer.close();
     client.close();
 }
+
+static ConsumerConfiguration zeroQueueSharedConsumerConf(
+    const std::string& name, std::function<void(Consumer, const Message&)> 
callback) {
+    ConsumerConfiguration conf;
+    conf.setConsumerType(ConsumerShared);
+    conf.setReceiverQueueSize(0);
+    conf.setSubscriptionInitialPosition(InitialPositionEarliest);
+    conf.setMessageListener([name, callback](Consumer consumer, const Message& 
msg) {
+        LOG_INFO(name << " received " << msg.getDataAsString() << " from " << 
msg.getMessageId());
+        callback(consumer, msg);
+    });
+    return conf;
+}
+
+class IntVector {
+   public:
+    size_t add(int i) {
+        std::lock_guard<std::mutex> lock(mutex_);
+        data_.emplace_back(i);
+        return data_.size();
+    }
+
+    std::vector<int> data() const {
+        std::lock_guard<std::mutex> lock(mutex_);
+        return data_;
+    }
+
+   private:
+    std::vector<int> data_;
+    mutable std::mutex mutex_;
+};
+
+TEST(ZeroQueueSizeTest, testPauseResume) {
+    Client client(lookupUrl);
+    const auto topic = "ZeroQueueSizeTestPauseListener-" + 
std::to_string(time(nullptr));
+    const auto subscription = "my-sub";
+
+    auto intToMessage = [](int i) { return 
MessageBuilder().setContent(std::to_string(i)).build(); };
+    auto messageToInt = [](const Message& msg) { return 
std::stoi(msg.getDataAsString()); };
+
+    // 1. Produce 10 messages
+    Producer producer;
+    const auto producerConf = 
ProducerConfiguration().setBatchingEnabled(false);
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf, producer));
+    for (int i = 0; i < 10; i++) {
+        MessageId id;
+        ASSERT_EQ(ResultOk, producer.send(intToMessage(i), id));
+        LOG_INFO("Send " << i << " to " << id);
+    }
+
+    // 2. consumer-1 receives 1 message and pause
+    std::mutex mtx;
+    std::condition_variable condConsumer1FirstMessage;
+    std::condition_variable condConsumer1Completed;
+    IntVector messages1;
+    const auto conf1 = zeroQueueSharedConsumerConf("consumer-1", [&](Consumer 
consumer, const Message& msg) {
+        const auto numReceived = messages1.add(messageToInt(msg));
+        if (numReceived == 1) {
+            ASSERT_EQ(ResultOk, consumer.pauseMessageListener());
+            condConsumer1FirstMessage.notify_all();
+        } else if (numReceived == 5) {
+            ASSERT_EQ(ResultOk, consumer.pauseMessageListener());
+            condConsumer1Completed.notify_all();
+        }
+    });
+    Consumer consumer1;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subscription, conf1, 
consumer1));
+    {
+        std::unique_lock<std::mutex> lock(mtx);
+        ASSERT_EQ(condConsumer1FirstMessage.wait_for(lock, 
std::chrono::seconds(3)),
+                  std::cv_status::no_timeout);
+        ASSERT_EQ(messages1.data(), (std::vector<int>{0}));
+    }
+
+    // 3. consumer-2 receives 5 messages and pause
+    std::condition_variable condConsumer2Completed;
+    IntVector messages2;
+    const auto conf2 = zeroQueueSharedConsumerConf("consumer-2", [&](Consumer 
consumer, const Message& msg) {
+        const int numReceived = messages2.add(messageToInt(msg));
+        if (numReceived == 5) {
+            ASSERT_EQ(ResultOk, consumer.pauseMessageListener());
+            condConsumer2Completed.notify_all();
+        }
+    });
+    Consumer consumer2;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subscription, conf2, 
consumer2));
+    {
+        std::unique_lock<std::mutex> lock(mtx);
+        ASSERT_EQ(condConsumer2Completed.wait_for(lock, 
std::chrono::seconds(3)), std::cv_status::no_timeout);
+        ASSERT_EQ(messages2.data(), (std::vector<int>{1, 2, 3, 4, 5}));
+    }
+
+    // 4. consumer-1 resumes listening, and receives last 4 messages
+    ASSERT_EQ(ResultOk, consumer1.resumeMessageListener());
+    {
+        std::unique_lock<std::mutex> lock(mtx);
+        ASSERT_EQ(condConsumer1Completed.wait_for(lock, 
std::chrono::seconds(3)), std::cv_status::no_timeout);
+        ASSERT_EQ(messages1.data(), (std::vector<int>{0, 6, 7, 8, 9}));
+    }
+
+    client.close();
+}

Reply via email to