This is an automated email from the ASF dual-hosted git repository.
wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 22b37a4 feat: graceful shutdown lockTicker (#259) (#265)
22b37a4 is described below
commit 22b37a41077013b850dd42c51982b83644f66b97
Author: wolftankk <[email protected]>
AuthorDate: Fri Oct 25 21:34:53 2019 +0800
feat: graceful shutdown lockTicker (#259) (#265)
---
consumer/push_consumer.go | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 8f72805..95f6280 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -54,6 +54,7 @@ type pushConsumer struct {
subscribedTopic map[string]string
interceptor primitive.Interceptor
queueLock *QueueLock
+ lockTicker *time.Ticker
}
func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
@@ -92,6 +93,7 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
defaultConsumer: dc,
subscribedTopic: make(map[string]string, 0),
queueLock: newQueueLock(),
+ lockTicker:
time.NewTicker(dc.option.RebalanceLockInterval),
}
dc.mqChanged = p.messageQueueChanged
if p.consumeOrderly {
@@ -131,11 +133,11 @@ func (pc *pushConsumer) Start() error {
time.Sleep(1000 * time.Millisecond)
pc.lockAll()
- t := time.NewTicker(pc.option.RebalanceLockInterval)
- for range t.C {
+ for range pc.lockTicker.C {
pc.lockAll()
}
}()
+
go func() {
// todo start clean msg expired
// TODO quit
@@ -164,6 +166,7 @@ func (pc *pushConsumer) Start() error {
}
func (pc *pushConsumer) Shutdown() error {
+ pc.lockTicker.Stop()
return pc.defaultConsumer.shutdown()
}