This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f68615  C++ client producer sendAsync() method will be blocked 
forever, if enough batched messages sent timeout. (#4569) (#4657)
2f68615 is described below

commit 2f6861579c0bfc370abd9ecf5409c683eaa96ee7
Author: Easyfan Zheng <zheng.easy...@gmail.com>
AuthorDate: Thu Jul 4 03:16:56 2019 +0800

    C++ client producer sendAsync() method will be blocked forever, if enough 
batched messages sent timeout. (#4569) (#4657)
---
 pulsar-client-cpp/lib/ProducerImpl.cc | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc 
b/pulsar-client-cpp/lib/ProducerImpl.cc
index b2e7848..bf9e3ac 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -232,6 +232,14 @@ void ProducerImpl::failPendingMessages(Result result) {
     // without holding producer mutex.
     for (MessageQueue::const_iterator it = pendingMessagesQueue_.begin(); it 
!= pendingMessagesQueue_.end();
          it++) {
+        // When dealing any failure message, if the current message is a batch 
one, we should also release
+        // the reserved spots in the pendingMessageQueue_, for all individual 
messages inside this batch
+        // message. See 'ProducerImpl::sendAsync' for more details.
+        if (it->msg_.impl_->metadata.has_num_messages_in_batch()) {
+            // batch message - need to release more spots
+            // -1 since the pushing batch message into the queue already 
released a spot
+            
pendingMessagesQueue_.release(it->msg_.impl_->metadata.num_messages_in_batch() 
- 1);
+        }
         messagesToFail.push_back(*it);
     }
 

Reply via email to