This is an automated email from the ASF dual-hosted git repository.
crossoverjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 41bc6f4d fix: enhance zero queue consumer reconnection handling and
message permit management (#1443)
41bc6f4d is described below
commit 41bc6f4d416d69cfd25ea90a8efa9c56d03cbde3
Author: crossoverJie <[email protected]>
AuthorDate: Fri Dec 5 15:05:19 2025 +0800
fix: enhance zero queue consumer reconnection handling and message permit
management (#1443)
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Zixuan Liu <[email protected]>
---
pulsar/consumer_partition.go | 18 +++--
pulsar/consumer_partition_test.go | 3 +
pulsar/consumer_zero_queue.go | 24 ++++++-
pulsar/consumer_zero_queue_test.go | 142 +++++++++++++++++++++++++++++++++++++
pulsar/impl_message.go | 7 ++
5 files changed, 184 insertions(+), 10 deletions(-)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index bfd44b18..ab35b7a1 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -124,10 +124,11 @@ type partitionConsumerOpts struct {
expireTimeOfIncompleteChunk time.Duration
autoAckIncompleteChunk bool
// in failover mode, this callback will be called when consumer change
- consumerEventListener ConsumerEventListener
- enableBatchIndexAck bool
- ackGroupingOptions *AckGroupingOptions
- enableZeroQueueConsumer bool
+ consumerEventListener ConsumerEventListener
+ enableBatchIndexAck bool
+ ackGroupingOptions *AckGroupingOptions
+ enableZeroQueueConsumer bool
+ zeroQueueReconnectedPolicy func(*partitionConsumer)
}
type ConsumerEventListener interface {
@@ -170,6 +171,7 @@ type partitionConsumer struct {
currentQueueSize uAtomic.Int32
scaleReceiverQueueHint uAtomic.Bool
incomingMessages uAtomic.Int32
+ reconnectCount uAtomic.Int32
eventsCh chan interface{}
connectedCh chan struct{}
@@ -1393,6 +1395,7 @@ func (pc *partitionConsumer) MessageReceived(response
*pb.CommandMessage, header
orderingKey: string(smm.OrderingKey),
index: messageIndex,
brokerPublishTime: brokerPublishTime,
+ conn: pc._getConn(),
}
} else {
msg = &message{
@@ -1413,6 +1416,7 @@ func (pc *partitionConsumer) MessageReceived(response
*pb.CommandMessage, header
orderingKey:
string(msgMeta.GetOrderingKey()),
index: messageIndex,
brokerPublishTime: brokerPublishTime,
+ conn: pc._getConn(),
}
}
@@ -1541,6 +1545,7 @@ func createEncryptionContext(msgMeta *pb.MessageMetadata)
*EncryptionContext {
func (pc *partitionConsumer) ConnectionClosed(closeConsumer
*pb.CommandCloseConsumer) {
// Trigger reconnection in the consumer goroutine
pc.log.Debug("connection closed and send to connectClosedCh")
+ pc.reconnectCount.Inc()
var assignedBrokerURL string
if closeConsumer != nil {
assignedBrokerURL = pc.client.selectServiceURL(
@@ -1925,9 +1930,8 @@ func (pc *partitionConsumer)
reconnectToBroker(connectionClosed *connectionClose
// Successfully reconnected
pc.log.Info("Reconnected consumer to broker")
bo.Reset()
- if pc.options.enableZeroQueueConsumer {
- pc.log.Info("zeroQueueConsumer reconnect, reset
availablePermits")
- pc.availablePermits.inc()
+ if pc.options.enableZeroQueueConsumer &&
pc.options.zeroQueueReconnectedPolicy != nil {
+ pc.options.zeroQueueReconnectedPolicy(pc)
}
return struct{}{}, nil
}
diff --git a/pulsar/consumer_partition_test.go
b/pulsar/consumer_partition_test.go
index 4594c148..31877693 100644
--- a/pulsar/consumer_partition_test.go
+++ b/pulsar/consumer_partition_test.go
@@ -37,6 +37,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
metrics: newTestMetrics(),
decryptor: crypto.NewNoopDecryptor(),
}
+ pc._setConn(dummyConnection{})
pc.availablePermits = &availablePermits{pc: &pc}
pc.ackGroupingTracker =
newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)
@@ -76,6 +77,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
metrics: newTestMetrics(),
decryptor: crypto.NewNoopDecryptor(),
}
+ pc._setConn(dummyConnection{})
pc.availablePermits = &availablePermits{pc: &pc}
pc.ackGroupingTracker =
newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)
@@ -112,6 +114,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
metrics: newTestMetrics(),
decryptor: crypto.NewNoopDecryptor(),
}
+ pc._setConn(dummyConnection{})
pc.availablePermits = &availablePermits{pc: &pc}
pc.ackGroupingTracker =
newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)
diff --git a/pulsar/consumer_zero_queue.go b/pulsar/consumer_zero_queue.go
index 5b85df8e..97016c38 100644
--- a/pulsar/consumer_zero_queue.go
+++ b/pulsar/consumer_zero_queue.go
@@ -23,6 +23,8 @@ import (
"sync"
"time"
+ uAtomic "go.uber.org/atomic"
+
"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/log"
"github.com/pkg/errors"
@@ -36,6 +38,7 @@ type zeroQueueConsumer struct {
pc *partitionConsumer
consumerName string
disableForceTopicCreation bool
+ waitingOnReceive uAtomic.Bool
messageCh chan ConsumerMessage
@@ -71,11 +74,17 @@ func newZeroConsumer(client *client, options
ConsumerOptions, topic string,
return nil, err
}
opts := newPartitionConsumerOpts(zc.topic, zc.consumerName,
tn.Partition, zc.options)
- conn, err := newPartitionConsumer(zc, zc.client, opts, zc.messageCh,
zc.dlq, zc.metrics)
+ opts.zeroQueueReconnectedPolicy = func(pc *partitionConsumer) {
+ if zc.waitingOnReceive.Load() {
+ pc.log.Info("zeroQueueConsumer reconnect, reset
availablePermits")
+ pc.availablePermits.inc()
+ }
+ }
+ pc, err := newPartitionConsumer(zc, zc.client, opts, zc.messageCh,
zc.dlq, zc.metrics)
if err != nil {
return nil, err
}
- zc.pc = conn
+ zc.pc = pc
return zc, nil
}
@@ -119,17 +128,26 @@ func (z *zeroQueueConsumer) Receive(ctx context.Context)
(Message, error) {
}
z.Lock()
defer z.Unlock()
+ z.waitingOnReceive.Store(true)
z.pc.availablePermits.inc()
for {
select {
case <-z.closeCh:
+ z.waitingOnReceive.Store(false)
return nil, newError(ConsumerClosed, "consumer closed")
case cm, ok := <-z.messageCh:
if !ok {
return nil, newError(ConsumerClosed, "consumer
closed")
}
- return cm.Message, nil
+ message, ok := cm.Message.(*message)
+ if ok && message.getConn().ID() == z.pc._getConn().ID()
{
+ z.waitingOnReceive.Store(false)
+ return cm.Message, nil
+ } else {
+ z.log.WithField("messageID",
cm.Message.ID()).Warn("message from old connection discarded after
reconnection")
+ }
case <-ctx.Done():
+ z.waitingOnReceive.Store(false)
return nil, ctx.Err()
}
}
diff --git a/pulsar/consumer_zero_queue_test.go
b/pulsar/consumer_zero_queue_test.go
index 444751a1..0b3e8760 100644
--- a/pulsar/consumer_zero_queue_test.go
+++ b/pulsar/consumer_zero_queue_test.go
@@ -243,6 +243,148 @@ func TestReconnectConsumer(t *testing.T) {
defer c.Terminate(ctx)
}
+func TestReconnectedBrokerSendPermits(t *testing.T) {
+ req := testcontainers.ContainerRequest{
+ Name: "pulsar-test",
+ Image: getPulsarTestImage(),
+ ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+ WaitingFor: wait.ForExposedPort(),
+ HostConfigModifier: func(config *container.HostConfig) {
+ config.PortBindings = map[nat.Port][]nat.PortBinding{
+ "6650/tcp": {{HostIP: "0.0.0.0", HostPort:
"6659"}},
+ "8080/tcp": {{HostIP: "0.0.0.0", HostPort:
"8089"}},
+ }
+ },
+ Cmd: []string{"bin/pulsar", "standalone", "-nfw"},
+ }
+ c, err := testcontainers.GenericContainer(context.Background(),
testcontainers.GenericContainerRequest{
+ ContainerRequest: req,
+ Started: true,
+ Reuse: true,
+ })
+ require.NoError(t, err, "Failed to start the pulsar container")
+ endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
+ require.NoError(t, err, "Failed to get the pulsar endpoint")
+
+ sLogger := slog.New(slog.NewTextHandler(os.Stdout,
&slog.HandlerOptions{Level: slog.LevelDebug}))
+ client, err := NewClient(ClientOptions{
+ URL: endpoint,
+ Logger: plog.NewLoggerWithSlog(sLogger),
+ })
+ assert.Nil(t, err)
+ adminEndpoint, err := c.PortEndpoint(context.Background(), "8080",
"http")
+ assert.Nil(t, err)
+ admin, err := pulsaradmin.NewClient(&config.Config{
+ WebServiceURL: adminEndpoint,
+ })
+ assert.Nil(t, err)
+
+ topic := newTopicName()
+ var consumer Consumer
+ require.Eventually(t, func() bool {
+ consumer, err = client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub",
+ EnableZeroQueueConsumer: true,
+ Type: Shared, // using Shared
subscription type to support unack subscription stats
+ })
+ return err == nil
+ }, 30*time.Second, 1*time.Second)
+ ctx := context.Background()
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: false,
+ })
+ assert.Nil(t, err)
+
+ // send 10 messages
+ for i := 0; i < 10; i++ {
+ msg, err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ Key: "pulsar",
+ Properties: map[string]string{
+ "key-1": "pulsar-1",
+ },
+ })
+ assert.Nil(t, err)
+ log.Printf("send message: %s", msg.String())
+ }
+
+ log.Println("unloading topic")
+ topicName, err := utils.GetTopicName(topic)
+ assert.Nil(t, err)
+ err = admin.Topics().Unload(*topicName)
+ assert.Nil(t, err)
+ log.Println("unloaded topic")
+ zc, ok := consumer.(*zeroQueueConsumer)
+ assert.True(t, ok)
+ // wait for reconnect
+ require.EventuallyWithT(t, func(c *assert.CollectT) {
+ reconnectCount := zc.pc.reconnectCount.Load()
+ require.Equal(c, reconnectCount, int32(1))
+ }, 30*time.Second, 1*time.Second)
+
+ // receive 10 messages
+ for i := 0; i < 10; i++ {
+ msg, err := consumer.Receive(context.Background())
+ if err != nil {
+ assert.Nil(t, err)
+ }
+
+ expectMsg := fmt.Sprintf("hello-%d", i)
+ expectProperties := map[string]string{
+ "key-1": "pulsar-1",
+ }
+ assert.Equal(t, []byte(expectMsg), msg.Payload())
+ assert.Equal(t, "pulsar", msg.Key())
+ assert.Equal(t, expectProperties, msg.Properties())
+ // ack message
+ err = consumer.Ack(msg)
+ assert.Nil(t, err)
+ log.Printf("receive message: %s", msg.ID().String())
+ }
+ // send one more message and we do not manually receive it
+ _, err = producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", 10)),
+ Key: "pulsar",
+ Properties: map[string]string{
+ "key-1": "pulsar-1",
+ },
+ })
+ assert.Nil(t, err)
+ // wait for broker send messages to consumer and topic stats
update finish
+ option := utils.GetStatsOptions{
+ GetPreciseBacklog: true,
+ }
+ require.EventuallyWithT(t, func(c *assert.CollectT) {
+ topicStats, err :=
admin.Topics().GetStatsWithOptionWithContext(ctx, *topicName, option)
+ require.Nil(c, err)
+ for _, subscriptionStats := range topicStats.Subscriptions {
+ require.Equal(c, subscriptionStats.MsgBacklog, int64(1))
+ require.Equal(c,
subscriptionStats.Consumers[0].UnAckedMessages, 0)
+ }
+ }, 30*time.Second, 1*time.Second)
+
+ // ack
+ msg, err := consumer.Receive(context.Background())
+ assert.Nil(t, err)
+ err = consumer.Ack(msg)
+ assert.Nil(t, err)
+
+ // check topic stats
+ require.EventuallyWithT(t, func(c *assert.CollectT) {
+ topicStats, err :=
admin.Topics().GetStatsWithOptionWithContext(ctx, *topicName, option)
+ require.Nil(c, err)
+ for _, subscriptionStats := range topicStats.Subscriptions {
+ require.Equal(c, subscriptionStats.MsgBacklog, int64(0))
+ require.Equal(c,
subscriptionStats.Consumers[0].UnAckedMessages, 0)
+ }
+ }, 30*time.Second, 1*time.Second)
+
+}
+
func TestUnloadTopicBeforeConsume(t *testing.T) {
sLogger := slog.New(slog.NewTextHandler(os.Stdout,
&slog.HandlerOptions{Level: slog.LevelDebug}))
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index f32cc7fc..4f314c3d 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -26,6 +26,8 @@ import (
"sync/atomic"
"time"
+ "github.com/apache/pulsar-client-go/pulsar/internal"
+
"google.golang.org/protobuf/proto"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
@@ -313,6 +315,7 @@ type message struct {
encryptionContext *EncryptionContext
index *uint64
brokerPublishTime *time.Time
+ conn internal.Connection
}
func (msg *message) Topic() string {
@@ -394,6 +397,10 @@ func (msg *message) size() int {
return len(msg.payLoad)
}
+func (msg *message) getConn() internal.Connection {
+ return msg.conn
+}
+
func newAckTracker(size uint) *ackTracker {
batchIDs := bitset.New(size)
for i := uint(0); i < size; i++ {