This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new bdfa6a08 Fix the issue of unable to parse non-batch messages that with
non-empty properties and empty payloads. (#1435)
bdfa6a08 is described below
commit bdfa6a08d6b22ef367bda73b133424c45d710e8f
Author: Cong Zhao <[email protected]>
AuthorDate: Thu Nov 13 21:59:49 2025 +0800
Fix the issue of unable to parse non-batch messages that with non-empty
properties and empty payloads. (#1435)
### Motivation
```
time="2025-11-03T12:22:21+08:00" level=info msg="Created producer"
cnx="[::1]:55372 -> [::1]:6650" producerID=1 producer_name=standalone-0-14
topic="persistent://public/default/my-topic-965686000"
time="2025-11-03T12:22:21+08:00" level=error msg="Discarding corrupted
message" consumerID=1 msgID="ledgerId:19 entryId:0 partition:-1" name=
subscription=reader-ebjpf
topic="persistent://public/default/my-topic-965686000"
validationError=BatchDeSerializeError
```
### Modifications
Correct parse non-batch messages that with non-empty properties and empty
payloads.
---
pulsar/internal/commands.go | 9 ++++++++
pulsar/internal/commands_test.go | 5 +++--
pulsar/reader_test.go | 46 ++++++++++++++++++++++++++++++++++++++++
3 files changed, 58 insertions(+), 2 deletions(-)
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index b9f46234..7cca37d3 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -75,6 +75,8 @@ type MessageReader struct {
buffer Buffer
// true if we are parsing a batched message - set after parsing the
message metadata
batched bool
+ // true if the message has properties - set after parsing the message
metadata
+ hasProperties bool
}
// ReadChecksum
@@ -118,6 +120,10 @@ func (r *MessageReader) ReadMessageMetadata()
(*pb.MessageMetadata, error) {
r.batched = true
}
+ if len(meta.Properties) > 0 {
+ r.hasProperties = true
+ }
+
return &meta, nil
}
@@ -137,6 +143,9 @@ func (r *MessageReader) ReadBrokerMetadata()
(*pb.BrokerEntryMetadata, error) {
func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte,
error) {
if r.buffer.ReadableBytes() == 0 && r.buffer.Capacity() > 0 {
+ if !r.batched && r.hasProperties {
+ return nil, []byte{}, nil
+ }
return nil, nil, ErrEOM
}
if !r.batched {
diff --git a/pulsar/internal/commands_test.go b/pulsar/internal/commands_test.go
index c236e10c..9426a543 100644
--- a/pulsar/internal/commands_test.go
+++ b/pulsar/internal/commands_test.go
@@ -98,8 +98,9 @@ func TestReadMessageOldFormat(t *testing.T) {
assert.Equal(t, true, ssm == nil)
assert.Equal(t, "hello", string(payload))
- _, _, err = reader.ReadMessage()
- assert.Equal(t, ErrEOM, err)
+ _, payload, err = reader.ReadMessage()
+ assert.Equal(t, []byte{}, payload)
+ assert.Nil(t, err)
}
func TestReadMessagesBatchSize1(t *testing.T) {
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index 58c72525..e9d01ee1 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -1298,3 +1298,49 @@ func TestReaderReadFromLatest(t *testing.T) {
require.Error(t, err)
require.Nil(t, msg)
}
+
+func TestReaderEmptyPayloadNonemptyPropsNonBatch(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ ctx := context.Background()
+
+ // create reader
+ reader, err := client.CreateReader(ReaderOptions{
+ Topic: topic,
+ StartMessageID: EarliestMessageID(),
+ })
+ assert.Nil(t, err)
+ defer reader.Close()
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: true,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ // send 10 messages
+ for i := 0; i < 10; i++ {
+ _, err := producer.Send(ctx, &ProducerMessage{
+ Properties: map[string]string{"key": "value"},
+ Payload: []byte{},
+ })
+ assert.NoError(t, err)
+ }
+
+ // receive 10 messages
+ for i := 0; i < 10; i++ {
+ msg, err := reader.Next(ctx)
+ assert.NoError(t, err)
+
+ assert.Equal(t, map[string]string{"key": "value"},
msg.Properties())
+ assert.Equal(t, []byte{}, msg.Payload())
+ }
+}