This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5293e2a3a97be85799336ebf168bca6e090e1c50
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Jan 24 12:06:00 2022 +0800

    [C++] Fix hasMessageAvailable returns wrong value for last message (#13883)
    
    ### Motivation
    
    In C++ client, there is a corner case that when a reader's start message ID 
is the last message of a topic, `hasMessageAvailable` returns true. However, it 
should return false because the start message ID is exclusive and in this case 
`readNext` would never return a message unless new messages arrived.
    
    ### Modifications
    
    The current C++ implementation of `hasMessageAvailable` is from long days 
ago and has many problems. So this PR migrates the Java implementation of 
`hasMessageAvailable` to C++ client.
    
    Since after the modifications we need to access `startMessageId` in 
`hasMessageAvailable`, which is called in a different thread from 
`connectionOpened` that might modify `startMessageId`. We use a common mutex 
`mutexForMessageIds` to protect the access to `lastDequedMessageId_` and 
`lastMessageIdInBroker_`.
    
    To fix the original tests when `startMessageId` is latest, this PR adds a 
`GetLastMessageIdResponse` as the response of `GetLastMessageId` request. The  
`GetLastMessageIdResponse` contains the `consumer_mark_delete_position` 
introduced from https://github.com/apache/pulsar/pull/9652 to compare with 
`last_message_id` when `startMessageId` is latest.
    
    ### Verifying this change
    
    This change added tests `ReaderTest#testHasMessageAvailableWhenCreated` and 
`MessageIdTest# testCompareLedgerAndEntryId`.
    
    (cherry picked from commit e50493ea17dd5f2f9d4527d74cc4f40e12439df2)
---
 pulsar-client-cpp/lib/ConsumerImpl.cc | 4 ++++
 pulsar-client-cpp/lib/ReaderImpl.cc   | 4 +++-
 2 files changed, 7 insertions(+), 1 deletion(-)

diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc 
b/pulsar-client-cpp/lib/ConsumerImpl.cc
index fa872f5..1f4a6b5 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -165,6 +165,10 @@ void ConsumerImpl::connectionOpened(const 
ClientConnectionPtr& cnx) {
         return;
     }
 
+    // Register consumer so that we can handle other incomming commands (e.g. 
ACTIVE_CONSUMER_CHANGE) after
+    // sending the subscribe request.
+    cnx->registerConsumer(consumerId_, shared_from_this());
+
     Lock lockForMessageId(mutexForMessageId_);
     Optional<MessageId> firstMessageInQueue = clearReceiveQueue();
     if (subscriptionMode_ == Commands::SubscriptionModeNonDurable) {
diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc 
b/pulsar-client-cpp/lib/ReaderImpl.cc
index 48f5d58..0a7b321 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.cc
+++ b/pulsar-client-cpp/lib/ReaderImpl.cc
@@ -139,7 +139,9 @@ void ReaderImpl::seekAsync(uint64_t timestamp, 
ResultCallback callback) {
 }
 
 void ReaderImpl::getLastMessageIdAsync(GetLastMessageIdCallback callback) {
-    consumer_->getLastMessageIdAsync(callback);
+    consumer_->getLastMessageIdAsync([callback](Result result, const 
GetLastMessageIdResponse& response) {
+        callback(result, response.getLastMessageId());
+    });
 }
 
 ReaderImplWeakPtr ReaderImpl::getReaderImplWeakPtr() { return 
readerImplWeakPtr_; }

Reply via email to