This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 59cd47f  add ut for defaultConsumer. resolve #178 (#179)
59cd47f is described below

commit 59cd47fe743ff7eff4b6f110ea5fbe525988b33c
Author: xujianhai666 <[email protected]>
AuthorDate: Mon Aug 26 10:51:12 2019 +0800

    add ut for defaultConsumer. resolve #178 (#179)
---
 consumer/consumer_test.go     | 138 ++++++++++++++++++++++++++++++++++++++++++
 consumer/mock_offset_store.go |  76 +++++++++++++++++++++++
 consumer/offset_store.go      |   1 +
 3 files changed, 215 insertions(+)

diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go
index a837b24..665547d 100644
--- a/consumer/consumer_test.go
+++ b/consumer/consumer_test.go
@@ -18,10 +18,17 @@ limitations under the License.
 package consumer
 
 import (
+       "sync"
        "testing"
        "time"
 
+       "github.com/golang/mock/gomock"
+       . "github.com/smartystreets/goconvey/convey"
        "github.com/stretchr/testify/assert"
+
+       "github.com/apache/rocketmq-client-go/internal"
+       "github.com/apache/rocketmq-client-go/internal/remote"
+       "github.com/apache/rocketmq-client-go/primitive"
 )
 
 func TestParseTimestamp(t *testing.T) {
@@ -30,3 +37,134 @@ func TestParseTimestamp(t *testing.T) {
        assert.Nil(t, err)
        assert.Equal(t, int64(1556652849), timestamp.Unix())
 }
+
+func TestDoRebalance(t *testing.T) {
+       Convey("Given a defaultConsumer", t, func() {
+               dc := &defaultConsumer{
+                       model: Clustering,
+               }
+
+               topic := "test"
+               broker := "127.0.0.1:8889"
+               clientID := "clientID"
+               mqs := []*primitive.MessageQueue{
+                       {
+                               Topic:      topic,
+                               BrokerName: "",
+                               QueueId:    0,
+                       },
+                       {
+                               Topic:      topic,
+                               BrokerName: "",
+                               QueueId:    1,
+                       },
+               }
+               dc.topicSubscribeInfoTable.Store(topic, mqs)
+               sub := &internal.SubscriptionData{}
+               dc.subscriptionDataTable.Store(topic, sub)
+
+               ctrl := gomock.NewController(t)
+               defer ctrl.Finish()
+               namesrvCli := internal.NewMockNamesrvs(ctrl)
+               
namesrvCli.EXPECT().FindBrokerAddrByTopic(gomock.Any()).Return(broker)
+               dc.namesrv = namesrvCli
+
+               rmqCli := internal.NewMockRMQClient(ctrl)
+               rmqCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), 
gomock.Any()).
+                       Return(&remote.RemotingCommand{
+                               Body: []byte("{\"consumerIdList\": [\"a1\", 
\"a2\", \"a3\"] }"),
+                       }, nil)
+               rmqCli.EXPECT().ClientID().Return(clientID)
+               dc.client = rmqCli
+
+               var wg sync.WaitGroup
+               wg.Add(1)
+               dc.allocate = func(cg string, clientID string, mqAll 
[]*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue {
+                       assert.Equal(t, cidAll, []string{"a1", "a2", "a3"})
+                       wg.Done()
+                       return nil
+               }
+
+               dc.doBalance()
+
+               wg.Wait()
+       })
+}
+
+func TestComputePullFromWhere(t *testing.T) {
+       Convey("Given a defaultConsumer", t, func() {
+               dc := &defaultConsumer{
+                       model: Clustering,
+                       cType: _PushConsume,
+               }
+
+               ctrl := gomock.NewController(t)
+               defer ctrl.Finish()
+
+               offsetStore := NewMockOffsetStore(ctrl)
+               dc.storage = offsetStore
+
+               mq := &primitive.MessageQueue{
+                       Topic: "test",
+               }
+
+               namesrvCli := internal.NewMockNamesrvs(ctrl)
+               dc.namesrv = namesrvCli
+
+               rmqCli := internal.NewMockRMQClient(ctrl)
+               dc.client = rmqCli
+
+               Convey("get effective offset", func() {
+                       offsetStore.EXPECT().read(gomock.Any(), 
gomock.Any()).Return(int64(10))
+                       res := dc.computePullFromWhere(mq)
+                       assert.Equal(t, int64(10), res)
+               })
+
+               Convey("ConsumeFromLastOffset for normal topic", func() {
+                       offsetStore.EXPECT().read(gomock.Any(), 
gomock.Any()).Return(int64(-1))
+                       dc.option.FromWhere = ConsumeFromLastOffset
+
+                       broker := "a"
+                       
namesrvCli.EXPECT().FindBrokerAddrByName(gomock.Any()).Return(broker)
+
+                       rmqCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), 
gomock.Any()).
+                               Return(&remote.RemotingCommand{
+                                       ExtFields: map[string]string{
+                                               "offset": "20",
+                                       },
+                               }, nil)
+
+                       res := dc.computePullFromWhere(mq)
+                       assert.Equal(t, int64(20), res)
+               })
+
+               Convey("ConsumeFromFirstOffset for normal topic", func() {
+                       offsetStore.EXPECT().read(gomock.Any(), 
gomock.Any()).Return(int64(-1))
+                       dc.option.FromWhere = ConsumeFromFirstOffset
+
+                       res := dc.computePullFromWhere(mq)
+                       assert.Equal(t, int64(0), res)
+               })
+
+               Convey("ConsumeFromTimestamp for normal topic", func() {
+                       offsetStore.EXPECT().read(gomock.Any(), 
gomock.Any()).Return(int64(-1))
+                       dc.option.FromWhere = ConsumeFromTimestamp
+
+                       dc.option.ConsumeTimestamp = "20060102150405"
+
+                       broker := "a"
+                       
namesrvCli.EXPECT().FindBrokerAddrByName(gomock.Any()).Return(broker)
+
+                       rmqCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), 
gomock.Any()).
+                               Return(&remote.RemotingCommand{
+                                       ExtFields: map[string]string{
+                                               "offset": "30",
+                                       },
+                               }, nil)
+
+                       res := dc.computePullFromWhere(mq)
+                       assert.Equal(t, int64(30), res)
+               })
+
+       })
+}
diff --git a/consumer/mock_offset_store.go b/consumer/mock_offset_store.go
new file mode 100644
index 0000000..093c50d
--- /dev/null
+++ b/consumer/mock_offset_store.go
@@ -0,0 +1,76 @@
+// Code generated by MockGen. DO NOT EDIT.
+// Source: offset_store.go
+
+// Package consumer is a generated GoMock package.
+package consumer
+
+import (
+       primitive "github.com/apache/rocketmq-client-go/primitive"
+       gomock "github.com/golang/mock/gomock"
+       reflect "reflect"
+)
+
+// MockOffsetStore is a mock of OffsetStore interface
+type MockOffsetStore struct {
+       ctrl     *gomock.Controller
+       recorder *MockOffsetStoreMockRecorder
+}
+
+// MockOffsetStoreMockRecorder is the mock recorder for MockOffsetStore
+type MockOffsetStoreMockRecorder struct {
+       mock *MockOffsetStore
+}
+
+// NewMockOffsetStore creates a new mock instance
+func NewMockOffsetStore(ctrl *gomock.Controller) *MockOffsetStore {
+       mock := &MockOffsetStore{ctrl: ctrl}
+       mock.recorder = &MockOffsetStoreMockRecorder{mock}
+       return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use
+func (m *MockOffsetStore) EXPECT() *MockOffsetStoreMockRecorder {
+       return m.recorder
+}
+
+// persist mocks base method
+func (m *MockOffsetStore) persist(mqs []*primitive.MessageQueue) {
+       m.ctrl.Call(m, "persist", mqs)
+}
+
+// persist indicates an expected call of persist
+func (mr *MockOffsetStoreMockRecorder) persist(mqs interface{}) *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "persist", 
reflect.TypeOf((*MockOffsetStore)(nil).persist), mqs)
+}
+
+// remove mocks base method
+func (m *MockOffsetStore) remove(mq *primitive.MessageQueue) {
+       m.ctrl.Call(m, "remove", mq)
+}
+
+// remove indicates an expected call of remove
+func (mr *MockOffsetStoreMockRecorder) remove(mq interface{}) *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "remove", 
reflect.TypeOf((*MockOffsetStore)(nil).remove), mq)
+}
+
+// read mocks base method
+func (m *MockOffsetStore) read(mq *primitive.MessageQueue, t readType) int64 {
+       ret := m.ctrl.Call(m, "read", mq, t)
+       ret0, _ := ret[0].(int64)
+       return ret0
+}
+
+// read indicates an expected call of read
+func (mr *MockOffsetStoreMockRecorder) read(mq, t interface{}) *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "read", 
reflect.TypeOf((*MockOffsetStore)(nil).read), mq, t)
+}
+
+// update mocks base method
+func (m *MockOffsetStore) update(mq *primitive.MessageQueue, offset int64, 
increaseOnly bool) {
+       m.ctrl.Call(m, "update", mq, offset, increaseOnly)
+}
+
+// update indicates an expected call of update
+func (mr *MockOffsetStoreMockRecorder) update(mq, offset, increaseOnly 
interface{}) *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "update", 
reflect.TypeOf((*MockOffsetStore)(nil).update), mq, offset, increaseOnly)
+}
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index f799c1a..4d1b7e9 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -51,6 +51,7 @@ func init() {
        }
 }
 
+//go:generate mockgen -source offset_store.go -destination 
mock_offset_store.go -self_package 
github.com/apache/rocketmq-client-go/consumer  --package consumer OffsetStore
 type OffsetStore interface {
        persist(mqs []*primitive.MessageQueue)
        remove(mq *primitive.MessageQueue)

Reply via email to