This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 6188199  add Name method to Consumer interface (#321)
6188199 is described below

commit 6188199e6b6260ef8fc9cc1b3350480116b3ca67
Author: Shohi Wang <osh...@gmail.com>
AuthorDate: Sat Jul 11 00:56:55 2020 +0800

    add Name method to Consumer interface (#321)
---
 pulsar/consumer.go            |  3 +++
 pulsar/consumer_impl.go       | 16 ++++++++++------
 pulsar/consumer_multitopic.go | 21 ++++++++++++++-------
 pulsar/consumer_regex.go      | 10 +++++++++-
 pulsar/consumer_test.go       | 25 +++++++++++++++++++++++++
 5 files changed, 61 insertions(+), 14 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 97020b6..8d3c771 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -197,4 +197,7 @@ type Consumer interface {
        //            the message publish time where to reposition the 
subscription
        //
        SeekByTime(time time.Time) error
+
+       // Name returns the name of consumer.
+       Name() string
 }
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 20dc1af..f9ee004 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -74,6 +74,10 @@ func newConsumer(client *client, options ConsumerOptions) 
(Consumer, error) {
                options.ReceiverQueueSize = 1000
        }
 
+       if options.Name == "" {
+               options.Name = generateRandomName()
+       }
+
        // did the user pass in a message channel?
        messageCh := options.MessageChannel
        if options.MessageChannel == nil {
@@ -136,12 +140,7 @@ func newInternalConsumer(client *client, options 
ConsumerOptions, topic string,
                errorCh:                   make(chan error),
                dlq:                       dlq,
                log:                       log.WithField("topic", topic),
-       }
-
-       if options.Name != "" {
-               consumer.consumerName = options.Name
-       } else {
-               consumer.consumerName = generateRandomName()
+               consumerName:              options.Name,
        }
 
        err := consumer.internalTopicSubscribeToPartitions()
@@ -166,6 +165,11 @@ func newInternalConsumer(client *client, options 
ConsumerOptions, topic string,
        return consumer, nil
 }
 
+// Name returns the name of consumer.
+func (c *consumer) Name() string {
+       return c.consumerName
+}
+
 func (c *consumer) internalTopicSubscribeToPartitions() error {
        partitions, err := c.client.TopicPartitions(c.topic)
        if err != nil {
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index 94e7b9d..a5386cb 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -32,7 +32,8 @@ import (
 type multiTopicConsumer struct {
        options ConsumerOptions
 
-       messageCh chan ConsumerMessage
+       consumerName string
+       messageCh    chan ConsumerMessage
 
        consumers map[string]Consumer
 
@@ -46,12 +47,13 @@ type multiTopicConsumer struct {
 func newMultiTopicConsumer(client *client, options ConsumerOptions, topics 
[]string,
        messageCh chan ConsumerMessage, dlq *dlqRouter) (Consumer, error) {
        mtc := &multiTopicConsumer{
-               options:   options,
-               messageCh: messageCh,
-               consumers: make(map[string]Consumer, len(topics)),
-               closeCh:   make(chan struct{}),
-               dlq:       dlq,
-               log:       &log.Entry{},
+               options:      options,
+               messageCh:    messageCh,
+               consumers:    make(map[string]Consumer, len(topics)),
+               closeCh:      make(chan struct{}),
+               dlq:          dlq,
+               log:          &log.Entry{},
+               consumerName: options.Name,
        }
 
        var errs error
@@ -173,3 +175,8 @@ func (c *multiTopicConsumer) Seek(msgID MessageID) error {
 func (c *multiTopicConsumer) SeekByTime(time time.Time) error {
        return errors.New("seek command not allowed for multi topic consumer")
 }
+
+// Name returns the name of consumer.
+func (c *multiTopicConsumer) Name() string {
+       return c.consumerName
+}
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index 3d0aebe..75ca657 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -59,6 +59,8 @@ type regexConsumer struct {
        ticker *time.Ticker
 
        log *log.Entry
+
+       consumerName string
 }
 
 func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, 
pattern *regexp.Regexp,
@@ -78,7 +80,8 @@ func newRegexConsumer(c *client, opts ConsumerOptions, tn 
*internal.TopicName, p
 
                closeCh: make(chan struct{}),
 
-               log: log.WithField("topic", tn.Name),
+               log:          log.WithField("topic", tn.Name),
+               consumerName: opts.Name,
        }
 
        topics, err := rc.topics()
@@ -222,6 +225,11 @@ func (c *regexConsumer) SeekByTime(time time.Time) error {
        return errors.New("seek command not allowed for regex consumer")
 }
 
+// Name returns the name of consumer.
+func (c *regexConsumer) Name() string {
+       return c.consumerName
+}
+
 func (c *regexConsumer) closed() bool {
        select {
        case <-c.closeCh:
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index d598dd3..4031f7d 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1342,3 +1342,28 @@ func TestProducerName(t *testing.T) {
                consumer.Ack(msg)
        }
 }
+
+func TestConsumerName(t *testing.T) {
+       assert := assert.New(t)
+
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.Nil(err)
+       defer client.Close()
+
+       topic := newTopicName()
+
+       // create consumer
+       consumerName := "test-consumer-name"
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Name:             consumerName,
+               Topic:            topic,
+               SubscriptionName: "my-sub",
+       })
+
+       assert.Nil(err)
+       defer consumer.Close()
+
+       assert.Equal(consumerName, consumer.Name())
+}

Reply via email to