This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new a1cf401  Fix `StartMessageIdInclusive` not work when reader reads from 
latest msg id (#386)
a1cf401 is described below

commit a1cf401ec0dd423871537559050396a2b001065d
Author: Zike Yang <z...@apache.org>
AuthorDate: Mon Jan 15 15:11:23 2024 +0800

    Fix `StartMessageIdInclusive` not work when reader reads from latest msg id 
(#386)
    
    Fixes #385
    
    ### Motivation
    
    The reader with `StartMessageIdInclusive` enabled should be able to reads 
messages from the latest message ID.
    
    ### Modifications
    
    - If `StartMessageIdInclusive` is enabled, the reader will seek and read 
the latest message in the topic.
---
 lib/ConsumerImpl.cc | 30 +++++++++++++++++++++++-------
 tests/ReaderTest.cc | 32 ++++++++++++++++++++++++++++++++
 2 files changed, 55 insertions(+), 7 deletions(-)

diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 5216218..dbd3b65 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -1508,18 +1508,34 @@ void 
ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
 
     if (messageId == MessageId::latest()) {
         lock.unlock();
-        getLastMessageIdAsync([callback](Result result, const 
GetLastMessageIdResponse& response) {
+        auto self = get_shared_this_ptr();
+        getLastMessageIdAsync([self, callback](Result result, const 
GetLastMessageIdResponse& response) {
             if (result != ResultOk) {
                 callback(result, {});
                 return;
             }
-            if (response.hasMarkDeletePosition() && 
response.getLastMessageId().entryId() >= 0) {
-                // We only care about comparing ledger ids and entry ids as 
mark delete position doesn't have
-                // other ids such as batch index
-                callback(ResultOk, 
compareLedgerAndEntryId(response.getMarkDeletePosition(),
-                                                           
response.getLastMessageId()) < 0);
+            auto handleResponse = [self, response, callback] {
+                if (response.hasMarkDeletePosition() && 
response.getLastMessageId().entryId() >= 0) {
+                    // We only care about comparing ledger ids and entry ids 
as mark delete position doesn't
+                    // have other ids such as batch index
+                    auto compareResult = 
compareLedgerAndEntryId(response.getMarkDeletePosition(),
+                                                                 
response.getLastMessageId());
+                    callback(ResultOk, 
self->config_.isStartMessageIdInclusive() ? compareResult <= 0
+                                                                               
  : compareResult < 0);
+                } else {
+                    callback(ResultOk, false);
+                }
+            };
+            if (self->config_.isStartMessageIdInclusive()) {
+                self->seekAsync(response.getLastMessageId(), [callback, 
handleResponse](Result result) {
+                    if (result != ResultOk) {
+                        callback(result, {});
+                        return;
+                    }
+                    handleResponse();
+                });
             } else {
-                callback(ResultOk, false);
+                handleResponse();
             }
         });
     } else {
diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc
index ac2fa23..723972d 100644
--- a/tests/ReaderTest.cc
+++ b/tests/ReaderTest.cc
@@ -752,4 +752,36 @@ TEST(ReaderSeekTest, testSeekForMessageId) {
     producer.close();
 }
 
+TEST(ReaderSeekTest, testStartAtLatestMessageId) {
+    Client client(serviceUrl);
+
+    const std::string topic = "test-seek-latest-message-id-" + 
std::to_string(time(nullptr));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+
+    MessageId id;
+    ASSERT_EQ(ResultOk, 
producer.send(MessageBuilder().setContent("msg").build(), id));
+
+    Reader readerExclusive;
+    ASSERT_EQ(ResultOk,
+              client.createReader(topic, MessageId::latest(), 
ReaderConfiguration(), readerExclusive));
+
+    Reader readerInclusive;
+    ASSERT_EQ(ResultOk,
+              client.createReader(topic, MessageId::latest(),
+                                  
ReaderConfiguration().setStartMessageIdInclusive(true), readerInclusive));
+
+    Message msg;
+    bool hasMsgAvaliable = false;
+    readerInclusive.hasMessageAvailable(hasMsgAvaliable);
+    ASSERT_TRUE(hasMsgAvaliable);
+    ASSERT_EQ(ResultOk, readerInclusive.readNext(msg, 3000));
+    ASSERT_EQ(ResultTimeout, readerExclusive.readNext(msg, 3000));
+
+    readerExclusive.close();
+    readerInclusive.close();
+    producer.close();
+}
+
 INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));

Reply via email to