This is an automated email from the ASF dual-hosted git repository. dinglei pushed a commit to branch revert-179-feat-utconsumer in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
commit a7fc155596c5a128d0086b3dcb7c8092b51b7504 Author: dinglei <[email protected]> AuthorDate: Mon Aug 26 11:08:25 2019 +0800 Revert "add ut for defaultConsumer. resolve #178 (#179)" This reverts commit 59cd47fe743ff7eff4b6f110ea5fbe525988b33c. --- consumer/consumer_test.go | 138 ------------------------------------------ consumer/mock_offset_store.go | 76 ----------------------- consumer/offset_store.go | 1 - 3 files changed, 215 deletions(-) diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go index 665547d..a837b24 100644 --- a/consumer/consumer_test.go +++ b/consumer/consumer_test.go @@ -18,17 +18,10 @@ limitations under the License. package consumer import ( - "sync" "testing" "time" - "github.com/golang/mock/gomock" - . "github.com/smartystreets/goconvey/convey" "github.com/stretchr/testify/assert" - - "github.com/apache/rocketmq-client-go/internal" - "github.com/apache/rocketmq-client-go/internal/remote" - "github.com/apache/rocketmq-client-go/primitive" ) func TestParseTimestamp(t *testing.T) { @@ -37,134 +30,3 @@ func TestParseTimestamp(t *testing.T) { assert.Nil(t, err) assert.Equal(t, int64(1556652849), timestamp.Unix()) } - -func TestDoRebalance(t *testing.T) { - Convey("Given a defaultConsumer", t, func() { - dc := &defaultConsumer{ - model: Clustering, - } - - topic := "test" - broker := "127.0.0.1:8889" - clientID := "clientID" - mqs := []*primitive.MessageQueue{ - { - Topic: topic, - BrokerName: "", - QueueId: 0, - }, - { - Topic: topic, - BrokerName: "", - QueueId: 1, - }, - } - dc.topicSubscribeInfoTable.Store(topic, mqs) - sub := &internal.SubscriptionData{} - dc.subscriptionDataTable.Store(topic, sub) - - ctrl := gomock.NewController(t) - defer ctrl.Finish() - namesrvCli := internal.NewMockNamesrvs(ctrl) - namesrvCli.EXPECT().FindBrokerAddrByTopic(gomock.Any()).Return(broker) - dc.namesrv = namesrvCli - - rmqCli := internal.NewMockRMQClient(ctrl) - rmqCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any()). - Return(&remote.RemotingCommand{ - Body: []byte("{\"consumerIdList\": [\"a1\", \"a2\", \"a3\"] }"), - }, nil) - rmqCli.EXPECT().ClientID().Return(clientID) - dc.client = rmqCli - - var wg sync.WaitGroup - wg.Add(1) - dc.allocate = func(cg string, clientID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue { - assert.Equal(t, cidAll, []string{"a1", "a2", "a3"}) - wg.Done() - return nil - } - - dc.doBalance() - - wg.Wait() - }) -} - -func TestComputePullFromWhere(t *testing.T) { - Convey("Given a defaultConsumer", t, func() { - dc := &defaultConsumer{ - model: Clustering, - cType: _PushConsume, - } - - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - offsetStore := NewMockOffsetStore(ctrl) - dc.storage = offsetStore - - mq := &primitive.MessageQueue{ - Topic: "test", - } - - namesrvCli := internal.NewMockNamesrvs(ctrl) - dc.namesrv = namesrvCli - - rmqCli := internal.NewMockRMQClient(ctrl) - dc.client = rmqCli - - Convey("get effective offset", func() { - offsetStore.EXPECT().read(gomock.Any(), gomock.Any()).Return(int64(10)) - res := dc.computePullFromWhere(mq) - assert.Equal(t, int64(10), res) - }) - - Convey("ConsumeFromLastOffset for normal topic", func() { - offsetStore.EXPECT().read(gomock.Any(), gomock.Any()).Return(int64(-1)) - dc.option.FromWhere = ConsumeFromLastOffset - - broker := "a" - namesrvCli.EXPECT().FindBrokerAddrByName(gomock.Any()).Return(broker) - - rmqCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any()). - Return(&remote.RemotingCommand{ - ExtFields: map[string]string{ - "offset": "20", - }, - }, nil) - - res := dc.computePullFromWhere(mq) - assert.Equal(t, int64(20), res) - }) - - Convey("ConsumeFromFirstOffset for normal topic", func() { - offsetStore.EXPECT().read(gomock.Any(), gomock.Any()).Return(int64(-1)) - dc.option.FromWhere = ConsumeFromFirstOffset - - res := dc.computePullFromWhere(mq) - assert.Equal(t, int64(0), res) - }) - - Convey("ConsumeFromTimestamp for normal topic", func() { - offsetStore.EXPECT().read(gomock.Any(), gomock.Any()).Return(int64(-1)) - dc.option.FromWhere = ConsumeFromTimestamp - - dc.option.ConsumeTimestamp = "20060102150405" - - broker := "a" - namesrvCli.EXPECT().FindBrokerAddrByName(gomock.Any()).Return(broker) - - rmqCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any()). - Return(&remote.RemotingCommand{ - ExtFields: map[string]string{ - "offset": "30", - }, - }, nil) - - res := dc.computePullFromWhere(mq) - assert.Equal(t, int64(30), res) - }) - - }) -} diff --git a/consumer/mock_offset_store.go b/consumer/mock_offset_store.go deleted file mode 100644 index 093c50d..0000000 --- a/consumer/mock_offset_store.go +++ /dev/null @@ -1,76 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: offset_store.go - -// Package consumer is a generated GoMock package. -package consumer - -import ( - primitive "github.com/apache/rocketmq-client-go/primitive" - gomock "github.com/golang/mock/gomock" - reflect "reflect" -) - -// MockOffsetStore is a mock of OffsetStore interface -type MockOffsetStore struct { - ctrl *gomock.Controller - recorder *MockOffsetStoreMockRecorder -} - -// MockOffsetStoreMockRecorder is the mock recorder for MockOffsetStore -type MockOffsetStoreMockRecorder struct { - mock *MockOffsetStore -} - -// NewMockOffsetStore creates a new mock instance -func NewMockOffsetStore(ctrl *gomock.Controller) *MockOffsetStore { - mock := &MockOffsetStore{ctrl: ctrl} - mock.recorder = &MockOffsetStoreMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use -func (m *MockOffsetStore) EXPECT() *MockOffsetStoreMockRecorder { - return m.recorder -} - -// persist mocks base method -func (m *MockOffsetStore) persist(mqs []*primitive.MessageQueue) { - m.ctrl.Call(m, "persist", mqs) -} - -// persist indicates an expected call of persist -func (mr *MockOffsetStoreMockRecorder) persist(mqs interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "persist", reflect.TypeOf((*MockOffsetStore)(nil).persist), mqs) -} - -// remove mocks base method -func (m *MockOffsetStore) remove(mq *primitive.MessageQueue) { - m.ctrl.Call(m, "remove", mq) -} - -// remove indicates an expected call of remove -func (mr *MockOffsetStoreMockRecorder) remove(mq interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "remove", reflect.TypeOf((*MockOffsetStore)(nil).remove), mq) -} - -// read mocks base method -func (m *MockOffsetStore) read(mq *primitive.MessageQueue, t readType) int64 { - ret := m.ctrl.Call(m, "read", mq, t) - ret0, _ := ret[0].(int64) - return ret0 -} - -// read indicates an expected call of read -func (mr *MockOffsetStoreMockRecorder) read(mq, t interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "read", reflect.TypeOf((*MockOffsetStore)(nil).read), mq, t) -} - -// update mocks base method -func (m *MockOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) { - m.ctrl.Call(m, "update", mq, offset, increaseOnly) -} - -// update indicates an expected call of update -func (mr *MockOffsetStoreMockRecorder) update(mq, offset, increaseOnly interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "update", reflect.TypeOf((*MockOffsetStore)(nil).update), mq, offset, increaseOnly) -} diff --git a/consumer/offset_store.go b/consumer/offset_store.go index 71d2c87..cedf8aa 100644 --- a/consumer/offset_store.go +++ b/consumer/offset_store.go @@ -52,7 +52,6 @@ func init() { } } -//go:generate mockgen -source offset_store.go -destination mock_offset_store.go -self_package github.com/apache/rocketmq-client-go/consumer --package consumer OffsetStore type OffsetStore interface { persist(mqs []*primitive.MessageQueue) remove(mq *primitive.MessageQueue)
