This is an automated email from the ASF dual-hosted git repository.

wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 22b37a4  feat: graceful shutdown lockTicker (#259) (#265)
22b37a4 is described below

commit 22b37a41077013b850dd42c51982b83644f66b97
Author: wolftankk <[email protected]>
AuthorDate: Fri Oct 25 21:34:53 2019 +0800

    feat: graceful shutdown lockTicker (#259) (#265)
---
 consumer/push_consumer.go | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 8f72805..95f6280 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -54,6 +54,7 @@ type pushConsumer struct {
        subscribedTopic              map[string]string
        interceptor                  primitive.Interceptor
        queueLock                    *QueueLock
+       lockTicker                   *time.Ticker
 }
 
 func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
@@ -92,6 +93,7 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
                defaultConsumer: dc,
                subscribedTopic: make(map[string]string, 0),
                queueLock:       newQueueLock(),
+               lockTicker:      
time.NewTicker(dc.option.RebalanceLockInterval),
        }
        dc.mqChanged = p.messageQueueChanged
        if p.consumeOrderly {
@@ -131,11 +133,11 @@ func (pc *pushConsumer) Start() error {
                        time.Sleep(1000 * time.Millisecond)
                        pc.lockAll()
 
-                       t := time.NewTicker(pc.option.RebalanceLockInterval)
-                       for range t.C {
+                       for range pc.lockTicker.C {
                                pc.lockAll()
                        }
                }()
+
                go func() {
                        // todo start clean msg expired
                        // TODO quit
@@ -164,6 +166,7 @@ func (pc *pushConsumer) Start() error {
 }
 
 func (pc *pushConsumer) Shutdown() error {
+       pc.lockTicker.Stop()
        return pc.defaultConsumer.shutdown()
 }
 

Reply via email to