This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 2a210ad  [ISSUE #130] Add Shutdown of PushConsumer (#172)
2a210ad is described below

commit 2a210adb138d42acf2bd3efe9f4a8ba5d830b03e
Author: wenfeng <[email protected]>
AuthorDate: Mon Aug 26 10:53:13 2019 +0800

    [ISSUE #130] Add Shutdown of PushConsumer (#172)
    
    * fix issue 130
    
    * fix CR comments
---
 api.go                         |  7 ++++++-
 consumer/consumer.go           | 13 ++++++++++---
 consumer/push_consumer.go      | 38 ++++++++++++++++++++++++++++++++++++--
 examples/consumer/pull/main.go | 13 ++++++++++---
 4 files changed, 62 insertions(+), 9 deletions(-)

diff --git a/api.go b/api.go
index ca54668..931dbcf 100644
--- a/api.go
+++ b/api.go
@@ -19,6 +19,7 @@ package rocketmq
 
 import (
        "context"
+       "github.com/pkg/errors"
 
        "github.com/apache/rocketmq-client-go/consumer"
        "github.com/apache/rocketmq-client-go/primitive"
@@ -127,6 +128,10 @@ type PullConsumer interface {
        //Resume(mqs ...primitive.MessageQueue) error
 }
 
+// The PullConsumer has not implemented completely, if you want have an 
experience of PullConsumer, you could use
+// consumer.NewPullConsumer(...), but it may changed in the future.
+//
+// The PullConsumer will be supported in next release
 func NewPullConsumer(opts ...consumer.Option) (PullConsumer, error) {
-       return nil, nil
+       return nil, errors.New("pull consumer has not supported")
 }
diff --git a/consumer/consumer.go b/consumer/consumer.go
index b3454a6..0564eaf 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -292,9 +292,17 @@ func (dc *defaultConsumer) start() error {
 }
 
 func (dc *defaultConsumer) shutdown() error {
-       dc.state = internal.StateRunning
+       dc.state = internal.StateShutdown
+       mqs := make([]*primitive.MessageQueue, 0)
+       dc.processQueueTable.Range(func(key, value interface{}) bool {
+               k := key.(primitive.MessageQueue)
+               pq := value.(*processQueue)
+               pq.dropped = true
+               mqs = append(mqs, &k)
+               return true
+       })
+       dc.storage.persist(mqs)
        dc.client.Shutdown()
-
        return nil
 }
 
@@ -397,7 +405,6 @@ func (dc *defaultConsumer) doBalance() {
                                        "topic=%s, clientId=%s, mqAllSize=%d, 
cidAllSize=%d, rebalanceResultSize=%d, "+
                                        "rebalanceResultSet=%v", 
dc.consumerGroup, topic, dc.client.ClientID(), len(mqAll),
                                        len(cidAll), len(allocateResult), 
allocateResult)
-
                        }
                }
                return true
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index e8aabdc..dfee161 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -164,7 +164,7 @@ func (pc *pushConsumer) Start() error {
 }
 
 func (pc *pushConsumer) Shutdown() error {
-       return nil
+       return pc.defaultConsumer.shutdown()
 }
 
 func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
@@ -220,7 +220,41 @@ func (pc *pushConsumer) IsUnitMode() bool {
 }
 
 func (pc *pushConsumer) messageQueueChanged(topic string, mqAll, mqDivided 
[]*primitive.MessageQueue) {
-       // TODO
+       v, exit := pc.subscriptionDataTable.Load(topic)
+       if !exit {
+               return
+       }
+       data := v.(*internal.SubscriptionData)
+       newVersion := time.Now().UnixNano()
+       rlog.Infof("the MessageQueue changed, also update version: %d to %d", 
data.SubVersion, newVersion)
+       data.SubVersion = newVersion
+
+       // TODO: optimize
+       count := 0
+       pc.processQueueTable.Range(func(key, value interface{}) bool {
+               count++
+               return true
+       })
+       if count > 0 {
+               if pc.option.PullThresholdForTopic != -1 {
+                       newVal := pc.option.PullThresholdForTopic / count
+                       if newVal == 0 {
+                               newVal = 1
+                       }
+                       rlog.Info("The PullThresholdForTopic is changed from %d 
to %d", pc.option.PullThresholdForTopic, newVal)
+                       pc.option.PullThresholdForTopic = newVal
+               }
+
+               if pc.option.PullThresholdSizeForTopic != -1 {
+                       newVal := pc.option.PullThresholdSizeForTopic / count
+                       if newVal == 0 {
+                               newVal = 1
+                       }
+                       rlog.Info("The PullThresholdSizeForTopic is changed 
from %d to %d", pc.option.PullThresholdSizeForTopic, newVal)
+                       pc.option.PullThresholdSizeForTopic = newVal
+               }
+       }
+       pc.client.SendHeartbeatToAllBrokerWithLock()
 }
 
 func (pc *pushConsumer) validate() {
diff --git a/examples/consumer/pull/main.go b/examples/consumer/pull/main.go
index c2915f9..d09da48 100644
--- a/examples/consumer/pull/main.go
+++ b/examples/consumer/pull/main.go
@@ -22,6 +22,7 @@ import (
        "fmt"
        "time"
 
+       "github.com/apache/rocketmq-client-go"
        "github.com/apache/rocketmq-client-go/consumer"
        "github.com/apache/rocketmq-client-go/internal/utils"
        "github.com/apache/rocketmq-client-go/primitive"
@@ -29,14 +30,20 @@ import (
 )
 
 func main() {
-       c, err := consumer.NewPullConsumer(consumer.WithGroupName("testGroup"), 
consumer.WithNameServer([]string{"127.0.0.1:9876"}))
+       c, err := rocketmq.NewPullConsumer(
+               consumer.WithGroupName("testGroup"),
+               consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+       )
+       if err != nil {
+               rlog.Fatal("fail to new pullConsumer: ", err)
+       }
+       err = c.Start()
        if err != nil {
                rlog.Fatal("fail to new pullConsumer: ", err)
        }
-       c.Start()
 
        ctx := context.Background()
-       queue := &primitive.MessageQueue{
+       queue := primitive.MessageQueue{
                Topic:      "TopicTest",
                BrokerName: "", // replace with your broker name. otherwise, 
pull will failed.
                QueueId:    0,

Reply via email to