This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push: new a1cf401 Fix `StartMessageIdInclusive` not work when reader reads from latest msg id (#386) a1cf401 is described below commit a1cf401ec0dd423871537559050396a2b001065d Author: Zike Yang <z...@apache.org> AuthorDate: Mon Jan 15 15:11:23 2024 +0800 Fix `StartMessageIdInclusive` not work when reader reads from latest msg id (#386) Fixes #385 ### Motivation The reader with `StartMessageIdInclusive` enabled should be able to reads messages from the latest message ID. ### Modifications - If `StartMessageIdInclusive` is enabled, the reader will seek and read the latest message in the topic. --- lib/ConsumerImpl.cc | 30 +++++++++++++++++++++++------- tests/ReaderTest.cc | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 7 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 5216218..dbd3b65 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -1508,18 +1508,34 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback if (messageId == MessageId::latest()) { lock.unlock(); - getLastMessageIdAsync([callback](Result result, const GetLastMessageIdResponse& response) { + auto self = get_shared_this_ptr(); + getLastMessageIdAsync([self, callback](Result result, const GetLastMessageIdResponse& response) { if (result != ResultOk) { callback(result, {}); return; } - if (response.hasMarkDeletePosition() && response.getLastMessageId().entryId() >= 0) { - // We only care about comparing ledger ids and entry ids as mark delete position doesn't have - // other ids such as batch index - callback(ResultOk, compareLedgerAndEntryId(response.getMarkDeletePosition(), - response.getLastMessageId()) < 0); + auto handleResponse = [self, response, callback] { + if (response.hasMarkDeletePosition() && response.getLastMessageId().entryId() >= 0) { + // We only care about comparing ledger ids and entry ids as mark delete position doesn't + // have other ids such as batch index + auto compareResult = compareLedgerAndEntryId(response.getMarkDeletePosition(), + response.getLastMessageId()); + callback(ResultOk, self->config_.isStartMessageIdInclusive() ? compareResult <= 0 + : compareResult < 0); + } else { + callback(ResultOk, false); + } + }; + if (self->config_.isStartMessageIdInclusive()) { + self->seekAsync(response.getLastMessageId(), [callback, handleResponse](Result result) { + if (result != ResultOk) { + callback(result, {}); + return; + } + handleResponse(); + }); } else { - callback(ResultOk, false); + handleResponse(); } }); } else { diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc index ac2fa23..723972d 100644 --- a/tests/ReaderTest.cc +++ b/tests/ReaderTest.cc @@ -752,4 +752,36 @@ TEST(ReaderSeekTest, testSeekForMessageId) { producer.close(); } +TEST(ReaderSeekTest, testStartAtLatestMessageId) { + Client client(serviceUrl); + + const std::string topic = "test-seek-latest-message-id-" + std::to_string(time(nullptr)); + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producer)); + + MessageId id; + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg").build(), id)); + + Reader readerExclusive; + ASSERT_EQ(ResultOk, + client.createReader(topic, MessageId::latest(), ReaderConfiguration(), readerExclusive)); + + Reader readerInclusive; + ASSERT_EQ(ResultOk, + client.createReader(topic, MessageId::latest(), + ReaderConfiguration().setStartMessageIdInclusive(true), readerInclusive)); + + Message msg; + bool hasMsgAvaliable = false; + readerInclusive.hasMessageAvailable(hasMsgAvaliable); + ASSERT_TRUE(hasMsgAvaliable); + ASSERT_EQ(ResultOk, readerInclusive.readNext(msg, 3000)); + ASSERT_EQ(ResultTimeout, readerExclusive.readNext(msg, 3000)); + + readerExclusive.close(); + readerInclusive.close(); + producer.close(); +} + INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));