This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new fa2b263d Fix partition update failure causing existing producers to 
close (#1437)
fa2b263d is described below

commit fa2b263d9f6dae72128945d277d8f9d25fbd7410
Author: Zike Yang <[email protected]>
AuthorDate: Tue Nov 18 13:03:14 2025 +0800

    Fix partition update failure causing existing producers to close (#1437)
---
 pulsar/message_chunking_test.go |  2 +-
 pulsar/producer_impl.go         | 74 ++++++++++++++++++++---------------------
 pulsar/producer_test.go         | 64 ++++++++++++++++++++++++++++++++---
 3 files changed, 96 insertions(+), 44 deletions(-)

diff --git a/pulsar/message_chunking_test.go b/pulsar/message_chunking_test.go
index c2e1113d..b85e914f 100644
--- a/pulsar/message_chunking_test.go
+++ b/pulsar/message_chunking_test.go
@@ -544,7 +544,7 @@ func sendSingleChunk(p Producer, uuid string, chunkID int, 
totalChunks int) {
                Payload: []byte(fmt.Sprintf("chunk-%s-%d|", uuid, chunkID)),
        }
        wholePayload := msg.Payload
-       producerImpl := p.(*producer).producers[0].(*partitionProducer)
+       producerImpl := p.(*producer).getProducer(0).(*partitionProducer)
        mm := producerImpl.genMetadata(msg, len(wholePayload), time.Now())
        mm.Uuid = proto.String(uuid)
        mm.NumChunksFromMsg = proto.Int32(int32(totalChunks))
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 8e970d28..a464f14c 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -23,7 +23,6 @@ import (
        "sync"
        "sync/atomic"
        "time"
-       "unsafe"
 
        "github.com/apache/pulsar-client-go/pulsar/crypto"
        "github.com/apache/pulsar-client-go/pulsar/internal"
@@ -48,13 +47,10 @@ const (
 )
 
 type producer struct {
-       sync.RWMutex
        client        *client
        options       *ProducerOptions
        topic         string
-       producers     []Producer
-       producersPtr  unsafe.Pointer
-       numPartitions uint32
+       producers     atomic.Value
        messageRouter func(*ProducerMessage, TopicMetadata) int
        closeOnce     sync.Once
        stopDiscovery func()
@@ -195,10 +191,7 @@ func (p *producer) internalCreatePartitionsProducers() 
error {
        oldNumPartitions := 0
        newNumPartitions := len(partitions)
 
-       p.Lock()
-       defer p.Unlock()
-
-       oldProducers := p.producers
+       oldProducers := p.getProducers()
        oldNumPartitions = len(oldProducers)
 
        if oldProducers != nil {
@@ -213,14 +206,14 @@ func (p *producer) internalCreatePartitionsProducers() 
error {
 
        }
 
-       p.producers = make([]Producer, newNumPartitions)
+       producers := make([]Producer, newNumPartitions)
 
        // When for some reason (eg: forced deletion of sub partition) causes 
oldNumPartitions> newNumPartitions,
        // we need to rebuild the cache of new producers, otherwise the array 
will be out of bounds.
        if oldProducers != nil && oldNumPartitions < newNumPartitions {
                // Copy over the existing consumer instances
                for i := 0; i < oldNumPartitions; i++ {
-                       p.producers[i] = oldProducers[i]
+                       producers[i] = oldProducers[i]
                }
        }
 
@@ -251,20 +244,23 @@ func (p *producer) internalCreatePartitionsProducers() 
error {
                }(partitionIdx, partition)
        }
 
+       var newProducers []Producer
+
        for i := 0; i < partitionsToAdd; i++ {
                pe, ok := <-c
                if ok {
                        if pe.err != nil {
                                err = pe.err
                        } else {
-                               p.producers[pe.partition] = pe.prod
+                               producers[pe.partition] = pe.prod
+                               newProducers = append(newProducers, pe.prod)
                        }
                }
        }
 
        if err != nil {
                // Since there were some failures, cleanup all the partitions 
that succeeded in creating the producers
-               for _, producer := range p.producers {
+               for _, producer := range newProducers {
                        if producer != nil {
                                producer.Close()
                        }
@@ -277,8 +273,7 @@ func (p *producer) internalCreatePartitionsProducers() 
error {
        } else {
                p.metrics.ProducersPartitions.Add(float64(partitionsToAdd))
        }
-       atomic.StorePointer(&p.producersPtr, unsafe.Pointer(&p.producers))
-       atomic.StoreUint32(&p.numPartitions, uint32(len(p.producers)))
+       p.producers.Store(producers)
        return nil
 }
 
@@ -287,14 +282,11 @@ func (p *producer) Topic() string {
 }
 
 func (p *producer) Name() string {
-       p.RLock()
-       defer p.RUnlock()
-
-       return p.producers[0].Name()
+       return p.getProducer(0).Name()
 }
 
 func (p *producer) NumPartitions() uint32 {
-       return atomic.LoadUint32(&p.numPartitions)
+       return uint32(len(p.getProducers()))
 }
 
 func (p *producer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, 
error) {
@@ -306,11 +298,11 @@ func (p *producer) SendAsync(ctx context.Context, msg 
*ProducerMessage,
        p.getPartition(msg).SendAsync(ctx, msg, callback)
 }
 
-func (p *producer) getPartition(msg *ProducerMessage) Producer {
-       // Since partitions can only increase, it's ok if the producers list
-       // is updated in between. The numPartition is updated only after the 
list.
-       partition := p.messageRouter(msg, p)
-       producers := *(*[]Producer)(atomic.LoadPointer(&p.producersPtr))
+func (p *producer) getProducer(partition int) Producer {
+       producers := p.getProducers()
+       if len(producers) == 0 {
+               panic("producer has not been initialized properly")
+       }
        if partition >= len(producers) {
                // We read the old producers list while the count was already
                // updated
@@ -319,12 +311,23 @@ func (p *producer) getPartition(msg *ProducerMessage) 
Producer {
        return producers[partition]
 }
 
-func (p *producer) LastSequenceID() int64 {
-       p.RLock()
-       defer p.RUnlock()
+func (p *producer) getProducers() []Producer {
+       if producers := p.producers.Load(); producers != nil {
+               return producers.([]Producer)
+       }
+       return []Producer{}
+}
+
+func (p *producer) getPartition(msg *ProducerMessage) Producer {
+       // Since partitions can only increase, it's ok if the producers list
+       // is updated in between. The numPartition is updated only after the 
list.
+       partition := p.messageRouter(msg, p)
+       return p.getProducer(partition)
+}
 
+func (p *producer) LastSequenceID() int64 {
        var maxSeq int64 = -1
-       for _, pp := range p.producers {
+       for _, pp := range p.getProducers() {
                s := pp.LastSequenceID()
                if s > maxSeq {
                        maxSeq = s
@@ -338,10 +341,7 @@ func (p *producer) Flush() error {
 }
 
 func (p *producer) FlushWithCtx(ctx context.Context) error {
-       p.RLock()
-       defer p.RUnlock()
-
-       for _, pp := range p.producers {
+       for _, pp := range p.getProducers() {
                if err := pp.FlushWithCtx(ctx); err != nil {
                        return err
                }
@@ -354,14 +354,12 @@ func (p *producer) Close() {
        p.closeOnce.Do(func() {
                p.stopDiscovery()
 
-               p.Lock()
-               defer p.Unlock()
-
-               for _, pp := range p.producers {
+               producers := p.getProducers()
+               for _, pp := range producers {
                        pp.Close()
                }
                p.client.handlers.Del(p)
-               p.metrics.ProducersPartitions.Sub(float64(len(p.producers)))
+               p.metrics.ProducersPartitions.Sub(float64(len(producers)))
                p.metrics.ProducersClosed.Inc()
        })
 }
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 00876070..30805942 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -30,6 +30,9 @@ import (
        "testing"
        "time"
 
+       "github.com/apache/pulsar-client-go/pulsaradmin"
+       "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
+       "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
        "github.com/stretchr/testify/require"
        "github.com/testcontainers/testcontainers-go"
        "github.com/testcontainers/testcontainers-go/wait"
@@ -1382,7 +1385,7 @@ func TestProducerWithBackoffPolicy(t *testing.T) {
        assert.Nil(t, err)
        defer _producer.Close()
 
-       partitionProducerImp := 
_producer.(*producer).producers[0].(*partitionProducer)
+       partitionProducerImp := 
_producer.(*producer).getProducer(0).(*partitionProducer)
        // 1 s
        startTime := time.Now()
        partitionProducerImp.reconnectToBroker(nil)
@@ -2477,7 +2480,7 @@ func TestFailPendingMessageWithClose(t *testing.T) {
                        }
                })
        }
-       partitionProducerImp := 
testProducer.(*producer).producers[0].(*partitionProducer)
+       partitionProducerImp := 
testProducer.(*producer).getProducer(0).(*partitionProducer)
        partitionProducerImp.pendingQueue.Put(&pendingItem{
                buffer: buffersPool.GetBuffer(0),
        })
@@ -2597,7 +2600,7 @@ func TestDisableReplication(t *testing.T) {
                writtenBuffers: &writtenBuffers,
        }
 
-       partitionProducerImp := 
testProducer.(*producer).producers[0].(*partitionProducer)
+       partitionProducerImp := 
testProducer.(*producer).getProducer(0).(*partitionProducer)
        partitionProducerImp.pendingQueue = pqw
 
        ID, err := testProducer.Send(context.Background(), &ProducerMessage{
@@ -2718,7 +2721,7 @@ func TestSelectConnectionForSameProducer(t *testing.T) {
        assert.NoError(t, err)
        defer _producer.Close()
 
-       partitionProducerImp := 
_producer.(*producer).producers[0].(*partitionProducer)
+       partitionProducerImp := 
_producer.(*producer).getProducer(0).(*partitionProducer)
        conn := partitionProducerImp._getConn()
 
        for i := 0; i < 5; i++ {
@@ -2762,7 +2765,7 @@ func TestSendBufferRetainWhenConnectionStuck(t 
*testing.T) {
                Topic: topicName,
        })
        assert.NoError(t, err)
-       pp := p.(*producer).producers[0].(*partitionProducer)
+       pp := p.(*producer).getProducer(0).(*partitionProducer)
 
        // Create a mock connection that tracks written buffers
        conn := &mockConn{
@@ -2898,3 +2901,54 @@ func testSendAsyncCouldTimeoutWhileReconnecting(t 
*testing.T, isDisableBatching
        }
        close(finalErr)
 }
+
+type mockRPCClient struct {
+       internal.RPCClient
+}
+
+func (m *mockRPCClient) RequestOnCnx(_ internal.Connection, _ uint64, _ 
pb.BaseCommand_Type,
+       _ proto.Message) (*internal.RPCResult, error) {
+       return nil, fmt.Errorf("expected error")
+}
+
+func TestPartitionUpdateFailed(t *testing.T) {
+       topicName := newTopicName()
+
+       admin, err := pulsaradmin.NewClient(&config.Config{
+               WebServiceURL: adminURL,
+       })
+       require.NoError(t, err)
+
+       tn, err := utils.GetTopicName(topicName)
+       require.NoError(t, err)
+       require.NoError(t, admin.Topics().Create(*tn, 1))
+
+       c, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       require.NoError(t, err)
+       p, err := c.CreateProducer(ProducerOptions{
+               Topic:                           topicName,
+               PartitionsAutoDiscoveryInterval: time.Second * 1,
+       })
+       require.NoError(t, err)
+       _, err = p.Send(context.Background(), &ProducerMessage{
+               Payload: []byte("test"),
+       })
+       require.NoError(t, err)
+       c.(*client).rpcClient = &mockRPCClient{
+               RPCClient: c.(*client).rpcClient,
+       }
+
+       require.NoError(t, admin.Topics().Update(*tn, 2))
+
+       // Assert that partition update failed won't affect the existing 
producers
+       for i := 0; i < 5; i++ {
+               _, err = p.Send(context.Background(), &ProducerMessage{
+                       Payload: []byte("test"),
+               })
+               require.NoError(t, err)
+
+               time.Sleep(time.Second * 1)
+       }
+}

Reply via email to