This is an automated email from the ASF dual-hosted git repository.
nodece pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new d664fcfb fix: add batchIdx to messageID.String() when batched message
(#1491)
d664fcfb is described below
commit d664fcfb2284f2f41abace18e2d4163e8ab61c93
Author: adrianiacobghiula <[email protected]>
AuthorDate: Sun May 10 14:10:34 2026 +0200
fix: add batchIdx to messageID.String() when batched message (#1491)
* fix: add batchIdx to messageID.String() when batched message
* fix: add batchIdx to messageID.String() when batched message
---
pulsar/impl_message.go | 3 +++
pulsar/impl_message_test.go | 12 ++++++++++++
pulsaradmin/pkg/admin/subscription_test.go | 16 ++++++++++------
3 files changed, 25 insertions(+), 6 deletions(-)
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index de338516..7b68e190 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -194,6 +194,9 @@ func (id *messageID) BatchSize() int32 {
}
func (id *messageID) String() string {
+ if id.batchIdx > -1 {
+ return fmt.Sprintf("%d:%d:%d:%d", id.ledgerID, id.entryID,
id.partitionIdx, id.batchIdx)
+ }
return fmt.Sprintf("%d:%d:%d", id.ledgerID, id.entryID, id.partitionIdx)
}
diff --git a/pulsar/impl_message_test.go b/pulsar/impl_message_test.go
index 6a21171c..8f419517 100644
--- a/pulsar/impl_message_test.go
+++ b/pulsar/impl_message_test.go
@@ -129,3 +129,15 @@ func TestAckingMessageIDBatchTwo(t *testing.T) {
assert.Equal(t, true, ids[0].ack())
assert.Equal(t, true, tracker.completed())
}
+
+func TestMessageStringOnMessage(t *testing.T) {
+ id := newMessageID(1, 2, -1, 4, 0)
+
+ assert.Equal(t, "1:2:4", id.String())
+}
+
+func TestMessageStringOnBatchMessage(t *testing.T) {
+ id := newMessageID(1, 2, 3, 4, 5)
+
+ assert.Equal(t, "1:2:4:3", id.String())
+}
diff --git a/pulsaradmin/pkg/admin/subscription_test.go
b/pulsaradmin/pkg/admin/subscription_test.go
index 08d2cf22..2d937a6e 100644
--- a/pulsaradmin/pkg/admin/subscription_test.go
+++ b/pulsaradmin/pkg/admin/subscription_test.go
@@ -64,24 +64,28 @@ func TestGetMessagesByID(t *testing.T) {
var wg sync.WaitGroup
wg.Add(numberMessages)
- messageIDMap := make(map[string]int32)
+ // Group by ledger:entry (ignoring batchIdx) to count batch sizes
+ type ledgerEntry struct {
+ LedgerID int64
+ EntryID int64
+ }
+ messageIDMap := make(map[ledgerEntry]int32)
for i := 0; i <= numberMessages; i++ {
producer.SendAsync(ctx, &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}, func(id pulsar.MessageID, _ *pulsar.ProducerMessage, err
error) {
assert.Nil(t, err)
- messageIDMap[id.String()]++
+ key := ledgerEntry{LedgerID: id.LedgerID(), EntryID:
id.EntryID()}
+ messageIDMap[key]++
wg.Done()
})
}
wg.Wait()
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
- for id, i := range messageIDMap {
+ for key, i := range messageIDMap {
assert.Equal(t, i, int32(batchingMaxMessages))
- messageID, err := utils.ParseMessageID(id)
- assert.Nil(t, err)
- messages, err :=
admin.Subscriptions().GetMessagesByID(*topicName, messageID.LedgerID,
messageID.EntryID)
+ messages, err :=
admin.Subscriptions().GetMessagesByID(*topicName, key.LedgerID, key.EntryID)
assert.Nil(t, err)
assert.Equal(t, batchingMaxMessages, len(messages))
}