This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5293e2a3a97be85799336ebf168bca6e090e1c50 Author: Yunze Xu <[email protected]> AuthorDate: Mon Jan 24 12:06:00 2022 +0800 [C++] Fix hasMessageAvailable returns wrong value for last message (#13883) ### Motivation In C++ client, there is a corner case that when a reader's start message ID is the last message of a topic, `hasMessageAvailable` returns true. However, it should return false because the start message ID is exclusive and in this case `readNext` would never return a message unless new messages arrived. ### Modifications The current C++ implementation of `hasMessageAvailable` is from long days ago and has many problems. So this PR migrates the Java implementation of `hasMessageAvailable` to C++ client. Since after the modifications we need to access `startMessageId` in `hasMessageAvailable`, which is called in a different thread from `connectionOpened` that might modify `startMessageId`. We use a common mutex `mutexForMessageIds` to protect the access to `lastDequedMessageId_` and `lastMessageIdInBroker_`. To fix the original tests when `startMessageId` is latest, this PR adds a `GetLastMessageIdResponse` as the response of `GetLastMessageId` request. The `GetLastMessageIdResponse` contains the `consumer_mark_delete_position` introduced from https://github.com/apache/pulsar/pull/9652 to compare with `last_message_id` when `startMessageId` is latest. ### Verifying this change This change added tests `ReaderTest#testHasMessageAvailableWhenCreated` and `MessageIdTest# testCompareLedgerAndEntryId`. (cherry picked from commit e50493ea17dd5f2f9d4527d74cc4f40e12439df2) --- pulsar-client-cpp/lib/ConsumerImpl.cc | 4 ++++ pulsar-client-cpp/lib/ReaderImpl.cc | 4 +++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc index fa872f5..1f4a6b5 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.cc +++ b/pulsar-client-cpp/lib/ConsumerImpl.cc @@ -165,6 +165,10 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) { return; } + // Register consumer so that we can handle other incomming commands (e.g. ACTIVE_CONSUMER_CHANGE) after + // sending the subscribe request. + cnx->registerConsumer(consumerId_, shared_from_this()); + Lock lockForMessageId(mutexForMessageId_); Optional<MessageId> firstMessageInQueue = clearReceiveQueue(); if (subscriptionMode_ == Commands::SubscriptionModeNonDurable) { diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc b/pulsar-client-cpp/lib/ReaderImpl.cc index 48f5d58..0a7b321 100644 --- a/pulsar-client-cpp/lib/ReaderImpl.cc +++ b/pulsar-client-cpp/lib/ReaderImpl.cc @@ -139,7 +139,9 @@ void ReaderImpl::seekAsync(uint64_t timestamp, ResultCallback callback) { } void ReaderImpl::getLastMessageIdAsync(GetLastMessageIdCallback callback) { - consumer_->getLastMessageIdAsync(callback); + consumer_->getLastMessageIdAsync([callback](Result result, const GetLastMessageIdResponse& response) { + callback(result, response.getLastMessageId()); + }); } ReaderImplWeakPtr ReaderImpl::getReaderImplWeakPtr() { return readerImplWeakPtr_; }
