This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 6188199 add Name method to Consumer interface (#321) 6188199 is described below commit 6188199e6b6260ef8fc9cc1b3350480116b3ca67 Author: Shohi Wang <osh...@gmail.com> AuthorDate: Sat Jul 11 00:56:55 2020 +0800 add Name method to Consumer interface (#321) --- pulsar/consumer.go | 3 +++ pulsar/consumer_impl.go | 16 ++++++++++------ pulsar/consumer_multitopic.go | 21 ++++++++++++++------- pulsar/consumer_regex.go | 10 +++++++++- pulsar/consumer_test.go | 25 +++++++++++++++++++++++++ 5 files changed, 61 insertions(+), 14 deletions(-) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index 97020b6..8d3c771 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -197,4 +197,7 @@ type Consumer interface { // the message publish time where to reposition the subscription // SeekByTime(time time.Time) error + + // Name returns the name of consumer. + Name() string } diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 20dc1af..f9ee004 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -74,6 +74,10 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { options.ReceiverQueueSize = 1000 } + if options.Name == "" { + options.Name = generateRandomName() + } + // did the user pass in a message channel? messageCh := options.MessageChannel if options.MessageChannel == nil { @@ -136,12 +140,7 @@ func newInternalConsumer(client *client, options ConsumerOptions, topic string, errorCh: make(chan error), dlq: dlq, log: log.WithField("topic", topic), - } - - if options.Name != "" { - consumer.consumerName = options.Name - } else { - consumer.consumerName = generateRandomName() + consumerName: options.Name, } err := consumer.internalTopicSubscribeToPartitions() @@ -166,6 +165,11 @@ func newInternalConsumer(client *client, options ConsumerOptions, topic string, return consumer, nil } +// Name returns the name of consumer. +func (c *consumer) Name() string { + return c.consumerName +} + func (c *consumer) internalTopicSubscribeToPartitions() error { partitions, err := c.client.TopicPartitions(c.topic) if err != nil { diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go index 94e7b9d..a5386cb 100644 --- a/pulsar/consumer_multitopic.go +++ b/pulsar/consumer_multitopic.go @@ -32,7 +32,8 @@ import ( type multiTopicConsumer struct { options ConsumerOptions - messageCh chan ConsumerMessage + consumerName string + messageCh chan ConsumerMessage consumers map[string]Consumer @@ -46,12 +47,13 @@ type multiTopicConsumer struct { func newMultiTopicConsumer(client *client, options ConsumerOptions, topics []string, messageCh chan ConsumerMessage, dlq *dlqRouter) (Consumer, error) { mtc := &multiTopicConsumer{ - options: options, - messageCh: messageCh, - consumers: make(map[string]Consumer, len(topics)), - closeCh: make(chan struct{}), - dlq: dlq, - log: &log.Entry{}, + options: options, + messageCh: messageCh, + consumers: make(map[string]Consumer, len(topics)), + closeCh: make(chan struct{}), + dlq: dlq, + log: &log.Entry{}, + consumerName: options.Name, } var errs error @@ -173,3 +175,8 @@ func (c *multiTopicConsumer) Seek(msgID MessageID) error { func (c *multiTopicConsumer) SeekByTime(time time.Time) error { return errors.New("seek command not allowed for multi topic consumer") } + +// Name returns the name of consumer. +func (c *multiTopicConsumer) Name() string { + return c.consumerName +} diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go index 3d0aebe..75ca657 100644 --- a/pulsar/consumer_regex.go +++ b/pulsar/consumer_regex.go @@ -59,6 +59,8 @@ type regexConsumer struct { ticker *time.Ticker log *log.Entry + + consumerName string } func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, pattern *regexp.Regexp, @@ -78,7 +80,8 @@ func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, p closeCh: make(chan struct{}), - log: log.WithField("topic", tn.Name), + log: log.WithField("topic", tn.Name), + consumerName: opts.Name, } topics, err := rc.topics() @@ -222,6 +225,11 @@ func (c *regexConsumer) SeekByTime(time time.Time) error { return errors.New("seek command not allowed for regex consumer") } +// Name returns the name of consumer. +func (c *regexConsumer) Name() string { + return c.consumerName +} + func (c *regexConsumer) closed() bool { select { case <-c.closeCh: diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index d598dd3..4031f7d 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1342,3 +1342,28 @@ func TestProducerName(t *testing.T) { consumer.Ack(msg) } } + +func TestConsumerName(t *testing.T) { + assert := assert.New(t) + + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + assert.Nil(err) + defer client.Close() + + topic := newTopicName() + + // create consumer + consumerName := "test-consumer-name" + consumer, err := client.Subscribe(ConsumerOptions{ + Name: consumerName, + Topic: topic, + SubscriptionName: "my-sub", + }) + + assert.Nil(err) + defer consumer.Close() + + assert.Equal(consumerName, consumer.Name()) +}