This is an automated email from the ASF dual-hosted git repository.

rfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 01a924f  modify producer reconnect bug ,fix 638 (#639)
01a924f is described below

commit 01a924f51189b6fa4128a0970281d4f69b05ba4a
Author: baomingyu <[email protected]>
AuthorDate: Thu Oct 14 10:12:55 2021 +0800

    modify producer reconnect bug ,fix 638 (#639)
    
    * modify producer reconnect bug ,fix 638
    
    * modfiy check style
---
 pulsar/producer_partition.go | 20 +++++++++++---------
 1 file changed, 11 insertions(+), 9 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index e1762f9..b2b9273 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -68,12 +68,13 @@ type partitionProducer struct {
        log    log.Logger
        cnx    internal.Connection
 
-       options             *ProducerOptions
-       producerName        string
-       producerID          uint64
-       batchBuilder        internal.BatchBuilder
-       sequenceIDGenerator *uint64
-       batchFlushTicker    *time.Ticker
+       options                  *ProducerOptions
+       producerName             string
+       userProvidedProducerName bool
+       producerID               uint64
+       batchBuilder             internal.BatchBuilder
+       sequenceIDGenerator      *uint64
+       batchFlushTicker         *time.Ticker
 
        // Channel where app is posting messages to be published
        eventsChan      chan interface{}
@@ -134,6 +135,9 @@ func newPartitionProducer(client *client, topic string, 
options *ProducerOptions
 
        if options.Name != "" {
                p.producerName = options.Name
+               p.userProvidedProducerName = true
+       } else {
+               p.userProvidedProducerName = false
        }
 
        encryption := options.Encryption
@@ -204,8 +208,6 @@ func (p *partitionProducer) grabCnx() error {
                p.log.Debug("The partition consumer schema is nil")
        }
 
-       userProvidedProducerName := p.producerName != ""
-
        cmdProducer := &pb.CommandProducer{
                RequestId:                proto.Uint64(id),
                Topic:                    proto.String(p.topic),
@@ -213,7 +215,7 @@ func (p *partitionProducer) grabCnx() error {
                ProducerId:               proto.Uint64(p.producerID),
                Schema:                   pbSchema,
                Epoch:                    
proto.Uint64(atomic.LoadUint64(&p.epoch)),
-               UserProvidedProducerName: proto.Bool(userProvidedProducerName),
+               UserProvidedProducerName: 
proto.Bool(p.userProvidedProducerName),
        }
 
        if p.producerName != "" {

Reply via email to