This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new bdfa6a08 Fix the issue of unable to parse non-batch messages that with 
non-empty properties and empty payloads. (#1435)
bdfa6a08 is described below

commit bdfa6a08d6b22ef367bda73b133424c45d710e8f
Author: Cong Zhao <[email protected]>
AuthorDate: Thu Nov 13 21:59:49 2025 +0800

    Fix the issue of unable to parse non-batch messages that with non-empty 
properties and empty payloads. (#1435)
    
    ### Motivation
    
    ```
    time="2025-11-03T12:22:21+08:00" level=info msg="Created producer" 
cnx="[::1]:55372 -> [::1]:6650" producerID=1 producer_name=standalone-0-14 
topic="persistent://public/default/my-topic-965686000"
    time="2025-11-03T12:22:21+08:00" level=error msg="Discarding corrupted 
message" consumerID=1 msgID="ledgerId:19 entryId:0 partition:-1" name= 
subscription=reader-ebjpf 
topic="persistent://public/default/my-topic-965686000" 
validationError=BatchDeSerializeError
    ```
    
    ### Modifications
    
    Correct parse non-batch messages that with non-empty properties and empty 
payloads.
---
 pulsar/internal/commands.go      |  9 ++++++++
 pulsar/internal/commands_test.go |  5 +++--
 pulsar/reader_test.go            | 46 ++++++++++++++++++++++++++++++++++++++++
 3 files changed, 58 insertions(+), 2 deletions(-)

diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index b9f46234..7cca37d3 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -75,6 +75,8 @@ type MessageReader struct {
        buffer Buffer
        // true if we are parsing a batched message - set after parsing the 
message metadata
        batched bool
+       // true if the message has properties - set after parsing the message 
metadata
+       hasProperties bool
 }
 
 // ReadChecksum
@@ -118,6 +120,10 @@ func (r *MessageReader) ReadMessageMetadata() 
(*pb.MessageMetadata, error) {
                r.batched = true
        }
 
+       if len(meta.Properties) > 0 {
+               r.hasProperties = true
+       }
+
        return &meta, nil
 }
 
@@ -137,6 +143,9 @@ func (r *MessageReader) ReadBrokerMetadata() 
(*pb.BrokerEntryMetadata, error) {
 
 func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte, 
error) {
        if r.buffer.ReadableBytes() == 0 && r.buffer.Capacity() > 0 {
+               if !r.batched && r.hasProperties {
+                       return nil, []byte{}, nil
+               }
                return nil, nil, ErrEOM
        }
        if !r.batched {
diff --git a/pulsar/internal/commands_test.go b/pulsar/internal/commands_test.go
index c236e10c..9426a543 100644
--- a/pulsar/internal/commands_test.go
+++ b/pulsar/internal/commands_test.go
@@ -98,8 +98,9 @@ func TestReadMessageOldFormat(t *testing.T) {
        assert.Equal(t, true, ssm == nil)
        assert.Equal(t, "hello", string(payload))
 
-       _, _, err = reader.ReadMessage()
-       assert.Equal(t, ErrEOM, err)
+       _, payload, err = reader.ReadMessage()
+       assert.Equal(t, []byte{}, payload)
+       assert.Nil(t, err)
 }
 
 func TestReadMessagesBatchSize1(t *testing.T) {
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index 58c72525..e9d01ee1 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -1298,3 +1298,49 @@ func TestReaderReadFromLatest(t *testing.T) {
        require.Error(t, err)
        require.Nil(t, msg)
 }
+
+func TestReaderEmptyPayloadNonemptyPropsNonBatch(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+       ctx := context.Background()
+
+       // create reader
+       reader, err := client.CreateReader(ReaderOptions{
+               Topic:          topic,
+               StartMessageID: EarliestMessageID(),
+       })
+       assert.Nil(t, err)
+       defer reader.Close()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: true,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       // send 10 messages
+       for i := 0; i < 10; i++ {
+               _, err := producer.Send(ctx, &ProducerMessage{
+                       Properties: map[string]string{"key": "value"},
+                       Payload:    []byte{},
+               })
+               assert.NoError(t, err)
+       }
+
+       // receive 10 messages
+       for i := 0; i < 10; i++ {
+               msg, err := reader.Next(ctx)
+               assert.NoError(t, err)
+
+               assert.Equal(t, map[string]string{"key": "value"}, 
msg.Properties())
+               assert.Equal(t, []byte{}, msg.Payload())
+       }
+}

Reply via email to