This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 23b60d1 Fix incorrect last sequence id when sending messages in batch
(#546)
23b60d1 is described below
commit 23b60d1fe264fd48775fae7c7ebabae355d3c2a9
Author: zhanglistar <[email protected]>
AuthorDate: Thu Mar 5 22:29:13 2026 +0800
Fix incorrect last sequence id when sending messages in batch (#546)
---
lib/Commands.cc | 4 +++-
lib/ProducerImpl.cc | 24 ++++++++++++++----------
tests/KeyBasedBatchingTest.cc | 9 +++++----
tests/ProducerTest.cc | 38 ++++++++++++++++++++++++++++++++++++++
tests/ReaderTest.cc | 15 +++++++++++++--
5 files changed, 73 insertions(+), 17 deletions(-)
diff --git a/lib/Commands.cc b/lib/Commands.cc
index 30f5bf1..08dc718 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -902,7 +902,9 @@ uint64_t
Commands::serializeSingleMessagesToBatchPayload(SharedBuffer& batchPayl
batchPayload.write(payload.data(), payload.readableBytes());
}
- return messages.back().impl_->metadata.sequence_id();
+ // Use the first message's sequence_id so that ackReceived can compute
+ // lastSequenceIdPublished_ = sequenceId + messagesCount - 1 correctly.
+ return messages.front().impl_->metadata.sequence_id();
}
Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage,
int32_t batchIndex,
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 360e128..c9a16e8 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -933,19 +933,24 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId,
MessageId& rawMessageId) {
return false;
}
- uint64_t expectedSequenceId = op.sendArgs->sequenceId;
- if (sequenceId > expectedSequenceId) {
- LOG_WARN(getName() << "Got ack for msg " << sequenceId
//
- << " expecting: " << expectedSequenceId << " queue
size=" //
- << pendingMessagesQueue_.size() << " producer: " <<
producerId_);
+ const uint64_t expectedFirstSequenceId = op.sendArgs->sequenceId;
+ const uint64_t expectedLastSequenceId = expectedFirstSequenceId +
op.messagesCount - 1;
+ // Broker may ack with either the first or the last sequence id of the
batch.
+ if (sequenceId > expectedLastSequenceId) {
+ LOG_WARN(getName() << "Got ack for msg " << sequenceId
+ << " expecting last: " << expectedLastSequenceId
+ << " queue size=" << pendingMessagesQueue_.size()
<< " producer: " << producerId_);
return false;
- } else if (sequenceId < expectedSequenceId) {
+ }
+ if (sequenceId < expectedFirstSequenceId) {
// Ignoring the ack since it's referring to a message that has already
timed out.
- LOG_DEBUG(getName() << "Got ack for timed out msg " << sequenceId //
- << " -- MessageId - " << messageId << " last-seq:
" << expectedSequenceId
- << " producer: " << producerId_);
+ LOG_DEBUG(getName() << "Got ack for timed out msg " << sequenceId << "
-- MessageId - " << messageId
+ << " first-seq: " << expectedFirstSequenceId << "
producer: " << producerId_);
return true;
}
+ // sequenceId is in [expectedFirstSequenceId, expectedLastSequenceId];
accept as matching this op.
+ const bool brokerSentFirst = (sequenceId == expectedFirstSequenceId);
+ lastSequenceIdPublished_ = brokerSentFirst ? expectedLastSequenceId :
sequenceId;
// Message was persisted correctly
LOG_DEBUG(getName() << "Received ack for msg " << sequenceId);
@@ -960,7 +965,6 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId,
MessageId& rawMessageId) {
}
releaseSemaphoreForSendOp(op);
- lastSequenceIdPublished_ = sequenceId + op.messagesCount - 1;
std::unique_ptr<OpSendMsg>
opSendMsg{pendingMessagesQueue_.front().release()};
pendingMessagesQueue_.pop_front();
diff --git a/tests/KeyBasedBatchingTest.cc b/tests/KeyBasedBatchingTest.cc
index e596266..d5c5ce7 100644
--- a/tests/KeyBasedBatchingTest.cc
+++ b/tests/KeyBasedBatchingTest.cc
@@ -134,10 +134,11 @@ TEST_F(KeyBasedBatchingTest, testSequenceId) {
sendAsync("B", "3");
sendAsync("C", "4");
sendAsync("A", "5");
- // sequence id: B < C < A, so there are 3 batches in order as following:
+ // Batches are sent in ascending order of the first message's sequence id
(BatchMessageKeyBasedContainer
+ // sorts by sendArgs->sequenceId). Send order gives A=0, B=1, C=2 for
first per key, so batches: A, B, C.
+ // A: 0, 5
// B: 1, 3
// C: 2, 4
- // A: 0, 5
latch.wait();
std::vector<std::string> receivedKeys;
@@ -149,8 +150,8 @@ TEST_F(KeyBasedBatchingTest, testSequenceId) {
receivedValues.emplace_back(msg.getDataAsString());
}
- decltype(receivedKeys) expectedKeys{"B", "B", "C", "C", "A", "A"};
- decltype(receivedValues) expectedValues{"1", "3", "2", "4", "0", "5"};
+ decltype(receivedKeys) expectedKeys{"A", "A", "B", "B", "C", "C"};
+ decltype(receivedValues) expectedValues{"0", "5", "1", "3", "2", "4"};
EXPECT_EQ(receivedKeys, expectedKeys);
EXPECT_EQ(receivedValues, expectedValues);
}
diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc
index edb79e4..4220a2e 100644
--- a/tests/ProducerTest.cc
+++ b/tests/ProducerTest.cc
@@ -441,6 +441,44 @@ TEST_P(ProducerTest, testFlushNoBatch) {
client.close();
}
+// Verifies that getLastSequenceId() is correct after sendAsync + flush when
batching is enabled.
+// Previously the batch used the last message's sequence_id, causing
lastSequenceIdPublished_ to be
+// doubled (e.g. 3 messages yielded 4 instead of 2). The batch must use the
first message's
+// sequence_id so that lastSequenceIdPublished_ = sequenceId + messagesCount -
1 is correct.
+TEST(ProducerTest, testGetLastSequenceIdAfterBatchFlush) {
+ Client client(serviceUrl);
+
+ const std::string topicName =
+ "persistent://public/default/testGetLastSequenceIdAfterBatchFlush-" +
std::to_string(time(nullptr));
+
+ ProducerConfiguration producerConfiguration;
+ producerConfiguration.setBatchingEnabled(true);
+ producerConfiguration.setBatchingMaxMessages(10);
+ producerConfiguration.setBatchingMaxPublishDelayMs(60000);
+
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topicName,
producerConfiguration, producer));
+
+ // Send 3 messages in a batch, then flush. Sequence ids are [0, 1, 2], so
getLastSequenceId() must be 2.
+ for (int i = 0; i < 3; i++) {
+ Message msg = MessageBuilder().setContent("content").build();
+ producer.sendAsync(msg, nullptr);
+ }
+ ASSERT_EQ(ResultOk, producer.flush());
+ ASSERT_EQ(producer.getLastSequenceId(), 2) << "After 3 messages, last
sequence id should be 2";
+
+ // Send 2 more (total 5), flush. Sequence ids for these are [3, 4], so
getLastSequenceId() must be 4.
+ for (int i = 0; i < 2; i++) {
+ Message msg = MessageBuilder().setContent("content").build();
+ producer.sendAsync(msg, nullptr);
+ }
+ ASSERT_EQ(ResultOk, producer.flush());
+ ASSERT_EQ(producer.getLastSequenceId(), 4) << "After 5 messages total,
last sequence id should be 4";
+
+ producer.close();
+ client.close();
+}
+
TEST_P(ProducerTest, testFlushBatch) {
Client client(serviceUrl);
diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc
index e4a924d..77719a1 100644
--- a/tests/ReaderTest.cc
+++ b/tests/ReaderTest.cc
@@ -22,6 +22,7 @@
#include <time.h>
#include <atomic>
+#include <chrono>
#include <functional>
#include <future>
#include <set>
@@ -863,7 +864,13 @@ TEST_P(ReaderSeekTest,
testHasMessageAvailableAfterSeekToEnd) {
}
ASSERT_EQ(ResultOk, reader.seek(MessageId::latest()));
- ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
+ // After seek-to-end the broker may close the consumer and trigger
reconnect; allow a short
+ // delay for hasMessageAvailable to become false (avoids flakiness when
reconnect completes).
+ for (int i = 0; i < 50; i++) {
+ ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
+ if (!hasMessageAvailable) break;
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
ASSERT_FALSE(hasMessageAvailable);
producer.send(MessageBuilder().setContent("msg-2").build());
@@ -876,7 +883,11 @@ TEST_P(ReaderSeekTest,
testHasMessageAvailableAfterSeekToEnd) {
// Test the 2nd seek
ASSERT_EQ(ResultOk, reader.seek(MessageId::latest()));
- ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
+ for (int i = 0; i < 50; i++) {
+ ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
+ if (!hasMessageAvailable) break;
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
ASSERT_FALSE(hasMessageAvailable);
}