This is an automated email from the ASF dual-hosted git repository.
huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 2a210ad [ISSUE #130] Add Shutdown of PushConsumer (#172)
2a210ad is described below
commit 2a210adb138d42acf2bd3efe9f4a8ba5d830b03e
Author: wenfeng <[email protected]>
AuthorDate: Mon Aug 26 10:53:13 2019 +0800
[ISSUE #130] Add Shutdown of PushConsumer (#172)
* fix issue 130
* fix CR comments
---
api.go | 7 ++++++-
consumer/consumer.go | 13 ++++++++++---
consumer/push_consumer.go | 38 ++++++++++++++++++++++++++++++++++++--
examples/consumer/pull/main.go | 13 ++++++++++---
4 files changed, 62 insertions(+), 9 deletions(-)
diff --git a/api.go b/api.go
index ca54668..931dbcf 100644
--- a/api.go
+++ b/api.go
@@ -19,6 +19,7 @@ package rocketmq
import (
"context"
+ "github.com/pkg/errors"
"github.com/apache/rocketmq-client-go/consumer"
"github.com/apache/rocketmq-client-go/primitive"
@@ -127,6 +128,10 @@ type PullConsumer interface {
//Resume(mqs ...primitive.MessageQueue) error
}
+// The PullConsumer has not implemented completely, if you want have an
experience of PullConsumer, you could use
+// consumer.NewPullConsumer(...), but it may changed in the future.
+//
+// The PullConsumer will be supported in next release
func NewPullConsumer(opts ...consumer.Option) (PullConsumer, error) {
- return nil, nil
+ return nil, errors.New("pull consumer has not supported")
}
diff --git a/consumer/consumer.go b/consumer/consumer.go
index b3454a6..0564eaf 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -292,9 +292,17 @@ func (dc *defaultConsumer) start() error {
}
func (dc *defaultConsumer) shutdown() error {
- dc.state = internal.StateRunning
+ dc.state = internal.StateShutdown
+ mqs := make([]*primitive.MessageQueue, 0)
+ dc.processQueueTable.Range(func(key, value interface{}) bool {
+ k := key.(primitive.MessageQueue)
+ pq := value.(*processQueue)
+ pq.dropped = true
+ mqs = append(mqs, &k)
+ return true
+ })
+ dc.storage.persist(mqs)
dc.client.Shutdown()
-
return nil
}
@@ -397,7 +405,6 @@ func (dc *defaultConsumer) doBalance() {
"topic=%s, clientId=%s, mqAllSize=%d,
cidAllSize=%d, rebalanceResultSize=%d, "+
"rebalanceResultSet=%v",
dc.consumerGroup, topic, dc.client.ClientID(), len(mqAll),
len(cidAll), len(allocateResult),
allocateResult)
-
}
}
return true
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index e8aabdc..dfee161 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -164,7 +164,7 @@ func (pc *pushConsumer) Start() error {
}
func (pc *pushConsumer) Shutdown() error {
- return nil
+ return pc.defaultConsumer.shutdown()
}
func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
@@ -220,7 +220,41 @@ func (pc *pushConsumer) IsUnitMode() bool {
}
func (pc *pushConsumer) messageQueueChanged(topic string, mqAll, mqDivided
[]*primitive.MessageQueue) {
- // TODO
+ v, exit := pc.subscriptionDataTable.Load(topic)
+ if !exit {
+ return
+ }
+ data := v.(*internal.SubscriptionData)
+ newVersion := time.Now().UnixNano()
+ rlog.Infof("the MessageQueue changed, also update version: %d to %d",
data.SubVersion, newVersion)
+ data.SubVersion = newVersion
+
+ // TODO: optimize
+ count := 0
+ pc.processQueueTable.Range(func(key, value interface{}) bool {
+ count++
+ return true
+ })
+ if count > 0 {
+ if pc.option.PullThresholdForTopic != -1 {
+ newVal := pc.option.PullThresholdForTopic / count
+ if newVal == 0 {
+ newVal = 1
+ }
+ rlog.Info("The PullThresholdForTopic is changed from %d
to %d", pc.option.PullThresholdForTopic, newVal)
+ pc.option.PullThresholdForTopic = newVal
+ }
+
+ if pc.option.PullThresholdSizeForTopic != -1 {
+ newVal := pc.option.PullThresholdSizeForTopic / count
+ if newVal == 0 {
+ newVal = 1
+ }
+ rlog.Info("The PullThresholdSizeForTopic is changed
from %d to %d", pc.option.PullThresholdSizeForTopic, newVal)
+ pc.option.PullThresholdSizeForTopic = newVal
+ }
+ }
+ pc.client.SendHeartbeatToAllBrokerWithLock()
}
func (pc *pushConsumer) validate() {
diff --git a/examples/consumer/pull/main.go b/examples/consumer/pull/main.go
index c2915f9..d09da48 100644
--- a/examples/consumer/pull/main.go
+++ b/examples/consumer/pull/main.go
@@ -22,6 +22,7 @@ import (
"fmt"
"time"
+ "github.com/apache/rocketmq-client-go"
"github.com/apache/rocketmq-client-go/consumer"
"github.com/apache/rocketmq-client-go/internal/utils"
"github.com/apache/rocketmq-client-go/primitive"
@@ -29,14 +30,20 @@ import (
)
func main() {
- c, err := consumer.NewPullConsumer(consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}))
+ c, err := rocketmq.NewPullConsumer(
+ consumer.WithGroupName("testGroup"),
+ consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+ )
+ if err != nil {
+ rlog.Fatal("fail to new pullConsumer: ", err)
+ }
+ err = c.Start()
if err != nil {
rlog.Fatal("fail to new pullConsumer: ", err)
}
- c.Start()
ctx := context.Background()
- queue := &primitive.MessageQueue{
+ queue := primitive.MessageQueue{
Topic: "TopicTest",
BrokerName: "", // replace with your broker name. otherwise,
pull will failed.
QueueId: 0,