This is an automated email from the ASF dual-hosted git repository.
rfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 01a924f modify producer reconnect bug ,fix 638 (#639)
01a924f is described below
commit 01a924f51189b6fa4128a0970281d4f69b05ba4a
Author: baomingyu <[email protected]>
AuthorDate: Thu Oct 14 10:12:55 2021 +0800
modify producer reconnect bug ,fix 638 (#639)
* modify producer reconnect bug ,fix 638
* modfiy check style
---
pulsar/producer_partition.go | 20 +++++++++++---------
1 file changed, 11 insertions(+), 9 deletions(-)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index e1762f9..b2b9273 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -68,12 +68,13 @@ type partitionProducer struct {
log log.Logger
cnx internal.Connection
- options *ProducerOptions
- producerName string
- producerID uint64
- batchBuilder internal.BatchBuilder
- sequenceIDGenerator *uint64
- batchFlushTicker *time.Ticker
+ options *ProducerOptions
+ producerName string
+ userProvidedProducerName bool
+ producerID uint64
+ batchBuilder internal.BatchBuilder
+ sequenceIDGenerator *uint64
+ batchFlushTicker *time.Ticker
// Channel where app is posting messages to be published
eventsChan chan interface{}
@@ -134,6 +135,9 @@ func newPartitionProducer(client *client, topic string,
options *ProducerOptions
if options.Name != "" {
p.producerName = options.Name
+ p.userProvidedProducerName = true
+ } else {
+ p.userProvidedProducerName = false
}
encryption := options.Encryption
@@ -204,8 +208,6 @@ func (p *partitionProducer) grabCnx() error {
p.log.Debug("The partition consumer schema is nil")
}
- userProvidedProducerName := p.producerName != ""
-
cmdProducer := &pb.CommandProducer{
RequestId: proto.Uint64(id),
Topic: proto.String(p.topic),
@@ -213,7 +215,7 @@ func (p *partitionProducer) grabCnx() error {
ProducerId: proto.Uint64(p.producerID),
Schema: pbSchema,
Epoch:
proto.Uint64(atomic.LoadUint64(&p.epoch)),
- UserProvidedProducerName: proto.Bool(userProvidedProducerName),
+ UserProvidedProducerName:
proto.Bool(p.userProvidedProducerName),
}
if p.producerName != "" {