This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 2f68615 C++ client producer sendAsync() method will be blocked forever, if enough batched messages sent timeout. (#4569) (#4657) 2f68615 is described below commit 2f6861579c0bfc370abd9ecf5409c683eaa96ee7 Author: Easyfan Zheng <zheng.easy...@gmail.com> AuthorDate: Thu Jul 4 03:16:56 2019 +0800 C++ client producer sendAsync() method will be blocked forever, if enough batched messages sent timeout. (#4569) (#4657) --- pulsar-client-cpp/lib/ProducerImpl.cc | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index b2e7848..bf9e3ac 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -232,6 +232,14 @@ void ProducerImpl::failPendingMessages(Result result) { // without holding producer mutex. for (MessageQueue::const_iterator it = pendingMessagesQueue_.begin(); it != pendingMessagesQueue_.end(); it++) { + // When dealing any failure message, if the current message is a batch one, we should also release + // the reserved spots in the pendingMessageQueue_, for all individual messages inside this batch + // message. See 'ProducerImpl::sendAsync' for more details. + if (it->msg_.impl_->metadata.has_num_messages_in_batch()) { + // batch message - need to release more spots + // -1 since the pushing batch message into the queue already released a spot + pendingMessagesQueue_.release(it->msg_.impl_->metadata.num_messages_in_batch() - 1); + } messagesToFail.push_back(*it); }