This is an automated email from the ASF dual-hosted git repository.

crossoverjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new ed7d4980 fix: improve zero queue consumer support for partitioned 
topics (#1424)
ed7d4980 is described below

commit ed7d4980034871e1db28770576151c4c05c7d0ea
Author: crossoverJie <[email protected]>
AuthorDate: Fri Oct 10 14:15:06 2025 +0800

    fix: improve zero queue consumer support for partitioned topics (#1424)
---
 pulsar/consumer_impl.go            |  8 ----
 pulsar/consumer_zero_queue.go      | 19 ++++++---
 pulsar/consumer_zero_queue_test.go | 79 ++++++++++++++++++++++++++++++++------
 3 files changed, 80 insertions(+), 26 deletions(-)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 20227bee..05db0726 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -23,12 +23,9 @@ import (
        "fmt"
        "math/rand"
        "strconv"
-       "strings"
        "sync"
        "time"
 
-       "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
-
        "github.com/apache/pulsar-client-go/pulsar/crypto"
        "github.com/apache/pulsar-client-go/pulsar/internal"
        pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
@@ -269,11 +266,6 @@ func newInternalConsumer(client *client, options 
ConsumerOptions, topic string,
                return nil, pkgerrors.New("ZeroQueueConsumer is not supported 
for partitioned topics")
        }
 
-       if len(partitions) == 1 && options.EnableZeroQueueConsumer &&
-               strings.Contains(partitions[0], utils.PARTITIONEDTOPICSUFFIX) {
-               return nil, pkgerrors.New("ZeroQueueConsumer is not supported 
for partitioned topics")
-       }
-
        if len(partitions) == 1 && options.EnableZeroQueueConsumer {
                return newZeroConsumer(client, options, topic, messageCh, dlq, 
rlq, disableForceTopicCreation)
        }
diff --git a/pulsar/consumer_zero_queue.go b/pulsar/consumer_zero_queue.go
index 3f2862da..5b85df8e 100644
--- a/pulsar/consumer_zero_queue.go
+++ b/pulsar/consumer_zero_queue.go
@@ -66,7 +66,11 @@ func newZeroConsumer(client *client, options 
ConsumerOptions, topic string,
                consumerName:              options.Name,
                metrics:                   
client.metrics.GetLeveledMetrics(topic),
        }
-       opts := newPartitionConsumerOpts(zc.topic, zc.consumerName, 0, 
zc.options)
+       tn, err := internal.ParseTopicName(topic)
+       if err != nil {
+               return nil, err
+       }
+       opts := newPartitionConsumerOpts(zc.topic, zc.consumerName, 
tn.Partition, zc.options)
        conn, err := newPartitionConsumer(zc, zc.client, opts, zc.messageCh, 
zc.dlq, zc.metrics)
        if err != nil {
                return nil, err
@@ -142,11 +146,14 @@ func (z *zeroQueueConsumer) Ack(m Message) error {
 
 func (z *zeroQueueConsumer) checkMsgIDPartition(msgID MessageID) error {
        partition := msgID.PartitionIdx()
-       if partition != 0 {
-               z.log.Errorf("invalid partition index %d expected a partition 
equal to 0",
-                       partition)
-               return fmt.Errorf("invalid partition index %d expected a 
partition equal to 0",
-                       partition)
+       if partition == 0 || partition == -1 {
+               return nil
+       }
+       if partition != z.pc.partitionIdx {
+               z.log.Errorf("invalid partition index %d expected a partition 
equal to %d",
+                       partition, z.pc.partitionIdx)
+               return fmt.Errorf("invalid partition index %d expected a 
partition equal to %d",
+                       partition, z.pc.partitionIdx)
        }
        return nil
 }
diff --git a/pulsar/consumer_zero_queue_test.go 
b/pulsar/consumer_zero_queue_test.go
index 06db433b..72048d79 100644
--- a/pulsar/consumer_zero_queue_test.go
+++ b/pulsar/consumer_zero_queue_test.go
@@ -115,7 +115,8 @@ func TestNormalZeroQueueConsumer(t *testing.T) {
                assert.Equal(t, "pulsar", msg.Key())
                assert.Equal(t, expectProperties, msg.Properties())
                // ack message
-               consumer.Ack(msg)
+               err = consumer.Ack(msg)
+               assert.Nil(t, err)
                log.Printf("receive message: %s", msg.ID().String())
        }
        err = consumer.Unsubscribe()
@@ -228,7 +229,8 @@ func TestReconnectConsumer(t *testing.T) {
                assert.Equal(t, "pulsar", msg.Key())
                assert.Equal(t, expectProperties, msg.Properties())
                // ack message
-               consumer.Ack(msg)
+               err = consumer.Ack(msg)
+               assert.Nil(t, err)
                log.Printf("receive message: %s", msg.ID().String())
        }
        err = consumer.Unsubscribe()
@@ -341,7 +343,7 @@ func TestPartitionZeroQueueConsumer(t *testing.T) {
        assert.Nil(t, consumer)
        assert.Error(t, err, "ZeroQueueConsumer is not supported for 
partitioned topics")
 }
-func TestOnePartitionZeroQueueConsumer(t *testing.T) {
+func TestSpecifiedPartitionZeroQueueConsumer(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: lookupURL,
        })
@@ -350,17 +352,65 @@ func TestOnePartitionZeroQueueConsumer(t *testing.T) {
        defer client.Close()
 
        topic := newTopicName()
-       err = createPartitionedTopic(topic, 1)
+       ctx := context.Background()
+       err = createPartitionedTopic(topic, 2)
+       assert.Nil(t, err)
+       topics, err := client.TopicPartitions(topic)
        assert.Nil(t, err)
 
        // create consumer
        consumer, err := client.Subscribe(ConsumerOptions{
-               Topic:                   topic,
+               Topic:                   topics[1],
                SubscriptionName:        "my-sub",
                EnableZeroQueueConsumer: true,
        })
-       assert.Nil(t, consumer)
-       assert.Error(t, err, "ZeroQueueConsumer is not supported for 
partitioned topics")
+       assert.Nil(t, err)
+       _, ok := consumer.(*zeroQueueConsumer)
+       assert.True(t, ok)
+       defer consumer.Close()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topics[1],
+               DisableBatching: false,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       // send 10 messages
+       for i := 0; i < 10; i++ {
+               msg, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+                       Key:     "pulsar",
+                       Properties: map[string]string{
+                               "key-1": "pulsar-1",
+                       },
+               })
+               assert.Nil(t, err)
+               log.Printf("send message: %s", msg.String())
+       }
+
+       // receive 10 messages
+       for i := 0; i < 10; i++ {
+               msg, err := consumer.Receive(context.Background())
+               if err != nil {
+                       log.Fatal(err)
+               }
+
+               expectMsg := fmt.Sprintf("hello-%d", i)
+               expectProperties := map[string]string{
+                       "key-1": "pulsar-1",
+               }
+               assert.Equal(t, []byte(expectMsg), msg.Payload())
+               assert.Equal(t, "pulsar", msg.Key())
+               assert.Equal(t, expectProperties, msg.Properties())
+               // ack message
+               err = consumer.Ack(msg)
+               assert.Nil(t, err)
+               log.Printf("receive message: %s", msg.ID().String())
+       }
+       err = consumer.Unsubscribe()
+       assert.Nil(t, err)
 }
 
 func TestZeroQueueConsumerGetLastMessageIDs(t *testing.T) {
@@ -576,7 +626,8 @@ func TestZeroQueueConsumer_Nack(t *testing.T) {
 
                if i%2 == 0 {
                        // Only acks even messages
-                       consumer.Ack(msg)
+                       err = consumer.Ack(msg)
+                       assert.Nil(t, err)
                } else {
                        // Fails to process odd messages
                        consumer.Nack(msg)
@@ -591,7 +642,8 @@ func TestZeroQueueConsumer_Nack(t *testing.T) {
                assert.Nil(t, err)
                assert.Equal(t, fmt.Sprintf("msg-content-%d", i), 
string(msg.Payload()))
 
-               consumer.Ack(msg)
+               err = consumer.Ack(msg)
+               assert.Nil(t, err)
        }
 }
 
@@ -641,7 +693,8 @@ func TestZeroQueueConsumer_Seek(t *testing.T) {
                msg, err := consumer.Receive(ctx)
                assert.Nil(t, err)
                assert.Equal(t, fmt.Sprintf("hello-%d", i), 
string(msg.Payload()))
-               consumer.Ack(msg)
+               err = consumer.Ack(msg)
+               assert.Nil(t, err)
        }
 
        err = consumer.Seek(seekID)
@@ -698,7 +751,8 @@ func TestZeroQueueConsumer_SeekByTime(t *testing.T) {
                msg, err := consumer.Receive(ctx)
                assert.Nil(t, err)
                assert.Equal(t, fmt.Sprintf("hello-%d", i), 
string(msg.Payload()))
-               consumer.Ack(msg)
+               err = consumer.Ack(msg)
+               assert.Nil(t, err)
        }
 
        currentTimestamp := time.Now()
@@ -711,6 +765,7 @@ func TestZeroQueueConsumer_SeekByTime(t *testing.T) {
                msg, err := consumer.Receive(ctx)
                assert.Nil(t, err)
                assert.Equal(t, fmt.Sprintf("hello-%d", i), 
string(msg.Payload()))
-               consumer.Ack(msg)
+               err = consumer.Ack(msg)
+               assert.Nil(t, err)
        }
 }

Reply via email to