This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 16443e2  [ISSUE #832] Client may submit wrong offset when network 
instability
16443e2 is described below

commit 16443e264556653ee821ce9a5391c7a13275fa0e
Author: AaronWang <[email protected]>
AuthorDate: Tue Jun 14 17:19:54 2022 +0800

    [ISSUE #832] Client may submit wrong offset when network instability
    
    Co-authored-by: chenhui <[email protected]>
---
 consumer/consumer.go          | 28 ++++++++++++++-----
 consumer/consumer_test.go     | 24 +++++++++++------
 consumer/mock_offset_store.go | 63 +++++++++++++++++++++++++++++--------------
 consumer/offset_store.go      | 29 ++++++++++++++------
 consumer/offset_store_test.go | 24 ++++++++---------
 consumer/pull_consumer.go     |  6 +++--
 consumer/push_consumer.go     | 17 +++++++++---
 7 files changed, 130 insertions(+), 61 deletions(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index ed843cf..6e53eaf 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -28,6 +28,7 @@ import (
        "time"
 
        "github.com/apache/rocketmq-client-go/v2/errors"
+
        jsoniter "github.com/json-iterator/go"
        "github.com/tidwall/gjson"
 
@@ -699,8 +700,9 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic 
string, mqs []*primitiv
                                continue
                        }
                        dc.storage.remove(&mq)
-                       nextOffset := dc.computePullFromWhere(&mq)
-                       if nextOffset >= 0 {
+                       nextOffset, err := 
dc.computePullFromWhereWithException(&mq)
+
+                       if nextOffset >= 0 && err == nil {
                                _, exist := dc.processQueueTable.Load(mq)
                                if exist {
                                        rlog.Debug("do defaultConsumer, mq 
already exist", map[string]interface{}{
@@ -741,12 +743,23 @@ func (dc *defaultConsumer) 
removeUnnecessaryMessageQueue(mq *primitive.MessageQu
        return true
 }
 
+// Deprecated: Use computePullFromWhereWithException instead.
 func (dc *defaultConsumer) computePullFromWhere(mq *primitive.MessageQueue) 
int64 {
+       result, _ := dc.computePullFromWhereWithException(mq)
+       return result
+}
+
+func (dc *defaultConsumer) computePullFromWhereWithException(mq 
*primitive.MessageQueue) (int64, error) {
        if dc.cType == _PullConsume {
-               return 0
+               return 0, nil
        }
-       var result = int64(-1)
-       lastOffset := dc.storage.read(mq, _ReadFromStore)
+       result := int64(-1)
+       lastOffset, err := dc.storage.readWithException(mq, _ReadFromStore)
+       if err != nil {
+               // 这里 lastOffset = -1
+               return lastOffset, err
+       }
+
        if lastOffset >= 0 {
                result = lastOffset
        } else {
@@ -803,7 +816,7 @@ func (dc *defaultConsumer) computePullFromWhere(mq 
*primitive.MessageQueue) int6
                default:
                }
        }
-       return result
+       return result, nil
 }
 
 func (dc *defaultConsumer) pullInner(ctx context.Context, queue 
*primitive.MessageQueue, data *internal.SubscriptionData,
@@ -950,7 +963,8 @@ func (dc *defaultConsumer) queryMaxOffset(mq 
*primitive.MessageQueue) (int64, er
 }
 
 func (dc *defaultConsumer) queryOffset(mq *primitive.MessageQueue) int64 {
-       return dc.storage.read(mq, _ReadMemoryThenStore)
+       result, _ := dc.storage.readWithException(mq, _ReadMemoryThenStore)
+       return result
 }
 
 // SearchOffsetByTimestamp with specific queueId and topic
diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go
index 12ccd18..e441290 100644
--- a/consumer/consumer_test.go
+++ b/consumer/consumer_test.go
@@ -26,6 +26,7 @@ import (
        . "github.com/smartystreets/goconvey/convey"
        "github.com/stretchr/testify/assert"
 
+       "github.com/apache/rocketmq-client-go/v2/errors"
        "github.com/apache/rocketmq-client-go/v2/internal"
        "github.com/apache/rocketmq-client-go/v2/internal/remote"
        "github.com/apache/rocketmq-client-go/v2/primitive"
@@ -116,13 +117,20 @@ func TestComputePullFromWhere(t *testing.T) {
                rmqCli.SetNameSrv(namesrvCli)
 
                Convey("get effective offset", func() {
-                       offsetStore.EXPECT().read(gomock.Any(), 
gomock.Any()).Return(int64(10))
-                       res := dc.computePullFromWhere(mq)
+                       offsetStore.EXPECT().readWithException(gomock.Any(), 
gomock.Any()).Return(int64(10), nil)
+                       res, _ := dc.computePullFromWhereWithException(mq)
                        assert.Equal(t, int64(10), res)
                })
 
+               Convey("get offset error", func() {
+                       offsetStore.EXPECT().readWithException(gomock.Any(), 
gomock.Any()).Return(int64(-1), errors.ErrRequestTimeout)
+
+                       _, err := dc.computePullFromWhereWithException(mq)
+                       assert.Equal(t, err, errors.ErrRequestTimeout)
+               })
+
                Convey("ConsumeFromLastOffset for normal topic", func() {
-                       offsetStore.EXPECT().read(gomock.Any(), 
gomock.Any()).Return(int64(-1))
+                       offsetStore.EXPECT().readWithException(gomock.Any(), 
gomock.Any()).Return(int64(-1), nil)
                        dc.option.FromWhere = ConsumeFromLastOffset
 
                        broker := "a"
@@ -135,20 +143,20 @@ func TestComputePullFromWhere(t *testing.T) {
                                        },
                                }, nil)
 
-                       res := dc.computePullFromWhere(mq)
+                       res, _ := dc.computePullFromWhereWithException(mq)
                        assert.Equal(t, int64(20), res)
                })
 
                Convey("ConsumeFromFirstOffset for normal topic", func() {
-                       offsetStore.EXPECT().read(gomock.Any(), 
gomock.Any()).Return(int64(-1))
+                       offsetStore.EXPECT().readWithException(gomock.Any(), 
gomock.Any()).Return(int64(-1), nil)
                        dc.option.FromWhere = ConsumeFromFirstOffset
 
-                       res := dc.computePullFromWhere(mq)
+                       res, _ := dc.computePullFromWhereWithException(mq)
                        assert.Equal(t, int64(0), res)
                })
 
                Convey("ConsumeFromTimestamp for normal topic", func() {
-                       offsetStore.EXPECT().read(gomock.Any(), 
gomock.Any()).Return(int64(-1))
+                       offsetStore.EXPECT().readWithException(gomock.Any(), 
gomock.Any()).Return(int64(-1), nil)
                        dc.option.FromWhere = ConsumeFromTimestamp
 
                        dc.option.ConsumeTimestamp = "20060102150405"
@@ -163,7 +171,7 @@ func TestComputePullFromWhere(t *testing.T) {
                                        },
                                }, nil)
 
-                       res := dc.computePullFromWhere(mq)
+                       res, _ := dc.computePullFromWhereWithException(mq)
                        assert.Equal(t, int64(30), res)
                })
 
diff --git a/consumer/mock_offset_store.go b/consumer/mock_offset_store.go
index bac1cdb..145ec3d 100644
--- a/consumer/mock_offset_store.go
+++ b/consumer/mock_offset_store.go
@@ -28,67 +28,90 @@ import (
        gomock "github.com/golang/mock/gomock"
 )
 
-// MockOffsetStore is a mock of OffsetStore interface
+// MockOffsetStore is a mock of OffsetStore interface.
 type MockOffsetStore struct {
        ctrl     *gomock.Controller
        recorder *MockOffsetStoreMockRecorder
 }
 
-// MockOffsetStoreMockRecorder is the mock recorder for MockOffsetStore
+// MockOffsetStoreMockRecorder is the mock recorder for MockOffsetStore.
 type MockOffsetStoreMockRecorder struct {
        mock *MockOffsetStore
 }
 
-// NewMockOffsetStore creates a new mock instance
+// NewMockOffsetStore creates a new mock instance.
 func NewMockOffsetStore(ctrl *gomock.Controller) *MockOffsetStore {
        mock := &MockOffsetStore{ctrl: ctrl}
        mock.recorder = &MockOffsetStoreMockRecorder{mock}
        return mock
 }
 
-// EXPECT returns an object that allows the caller to indicate expected use
+// EXPECT returns an object that allows the caller to indicate expected use.
 func (m *MockOffsetStore) EXPECT() *MockOffsetStoreMockRecorder {
        return m.recorder
 }
 
-// persist mocks base method
+// persist mocks base method.
 func (m *MockOffsetStore) persist(mqs []*primitive.MessageQueue) {
+       m.ctrl.T.Helper()
        m.ctrl.Call(m, "persist", mqs)
 }
 
-// persist indicates an expected call of persist
+// persist indicates an expected call of persist.
 func (mr *MockOffsetStoreMockRecorder) persist(mqs interface{}) *gomock.Call {
+       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "persist", 
reflect.TypeOf((*MockOffsetStore)(nil).persist), mqs)
 }
 
-// remove mocks base method
-func (m *MockOffsetStore) remove(mq *primitive.MessageQueue) {
-       m.ctrl.Call(m, "remove", mq)
-}
-
-// remove indicates an expected call of remove
-func (mr *MockOffsetStoreMockRecorder) remove(mq interface{}) *gomock.Call {
-       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "remove", 
reflect.TypeOf((*MockOffsetStore)(nil).remove), mq)
-}
-
-// read mocks base method
+// read mocks base method.
 func (m *MockOffsetStore) read(mq *primitive.MessageQueue, t readType) int64 {
+       m.ctrl.T.Helper()
        ret := m.ctrl.Call(m, "read", mq, t)
        ret0, _ := ret[0].(int64)
        return ret0
 }
 
-// read indicates an expected call of read
+// read indicates an expected call of read.
 func (mr *MockOffsetStoreMockRecorder) read(mq, t interface{}) *gomock.Call {
+       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "read", 
reflect.TypeOf((*MockOffsetStore)(nil).read), mq, t)
 }
 
-// update mocks base method
+// readWithException mocks base method.
+func (m *MockOffsetStore) readWithException(mq *primitive.MessageQueue, t 
readType) (int64, error) {
+       m.ctrl.T.Helper()
+       ret := m.ctrl.Call(m, "readWithException", mq, t)
+       ret0, _ := ret[0].(int64)
+       ret1, _ := ret[1].(error)
+       return ret0, ret1
+}
+
+// readWithException indicates an expected call of readWithException.
+func (mr *MockOffsetStoreMockRecorder) readWithException(mq, t interface{}) 
*gomock.Call {
+       mr.mock.ctrl.T.Helper()
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"readWithException", reflect.TypeOf((*MockOffsetStore)(nil).readWithException), 
mq, t)
+}
+
+// remove mocks base method.
+func (m *MockOffsetStore) remove(mq *primitive.MessageQueue) {
+       m.ctrl.T.Helper()
+       m.ctrl.Call(m, "remove", mq)
+}
+
+// remove indicates an expected call of remove.
+func (mr *MockOffsetStoreMockRecorder) remove(mq interface{}) *gomock.Call {
+       mr.mock.ctrl.T.Helper()
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "remove", 
reflect.TypeOf((*MockOffsetStore)(nil).remove), mq)
+}
+
+// update mocks base method.
 func (m *MockOffsetStore) update(mq *primitive.MessageQueue, offset int64, 
increaseOnly bool) {
+       m.ctrl.T.Helper()
        m.ctrl.Call(m, "update", mq, offset, increaseOnly)
 }
 
-// update indicates an expected call of update
+// update indicates an expected call of update.
 func (mr *MockOffsetStoreMockRecorder) update(mq, offset, increaseOnly 
interface{}) *gomock.Call {
+       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "update", 
reflect.TypeOf((*MockOffsetStore)(nil).update), mq, offset, increaseOnly)
 }
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index 5ecfd14..86ecd18 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -57,6 +57,7 @@ type OffsetStore interface {
        persist(mqs []*primitive.MessageQueue)
        remove(mq *primitive.MessageQueue)
        read(mq *primitive.MessageQueue, t readType) int64
+       readWithException(mq *primitive.MessageQueue, t readType) (int64, error)
        update(mq *primitive.MessageQueue, offset int64, increaseOnly bool)
 }
 
@@ -156,21 +157,27 @@ func (local *localFileOffsetStore) load() {
        }
 }
 
+// Deprecated: Use readWithException instead.
 func (local *localFileOffsetStore) read(mq *primitive.MessageQueue, t 
readType) int64 {
+       result, _ := local.readWithException(mq, t)
+       return result
+}
+
+func (local *localFileOffsetStore) readWithException(mq 
*primitive.MessageQueue, t readType) (int64, error) {
        switch t {
        case _ReadFromMemory, _ReadMemoryThenStore:
                off := readFromMemory(local.OffsetTable, mq)
                if off >= 0 || (off == -1 && t == _ReadFromMemory) {
-                       return off
+                       return off, nil
                }
                fallthrough
        case _ReadFromStore:
                local.load()
-               return readFromMemory(local.OffsetTable, mq)
+               return readFromMemory(local.OffsetTable, mq), nil
        default:
 
        }
-       return -1
+       return -1, nil
 }
 
 func (local *localFileOffsetStore) update(mq *primitive.MessageQueue, offset 
int64, increaseOnly bool) {
@@ -284,18 +291,24 @@ func (r *remoteBrokerOffsetStore) remove(mq 
*primitive.MessageQueue) {
        })
 }
 
+// Deprecated: Use readWithException instead.
 func (r *remoteBrokerOffsetStore) read(mq *primitive.MessageQueue, t readType) 
int64 {
+       result, _ := r.readWithException(mq, t)
+       return result
+}
+
+func (r *remoteBrokerOffsetStore) readWithException(mq 
*primitive.MessageQueue, t readType) (int64, error) {
        r.mutex.RLock()
        switch t {
        case _ReadFromMemory, _ReadMemoryThenStore:
                off, exist := r.OffsetTable[*mq]
                if exist {
                        r.mutex.RUnlock()
-                       return off
+                       return off, nil
                }
                if t == _ReadFromMemory {
                        r.mutex.RUnlock()
-                       return -1
+                       return -1, nil
                }
                fallthrough
        case _ReadFromStore:
@@ -307,7 +320,7 @@ func (r *remoteBrokerOffsetStore) read(mq 
*primitive.MessageQueue, t readType) i
                                rlog.LogKeyUnderlayError: err,
                        })
                        r.mutex.RUnlock()
-                       return -1
+                       return -1, err
                }
                rlog.Warning("fetch offset of mq from broker success", 
map[string]interface{}{
                        rlog.LogKeyConsumerGroup: r.group,
@@ -316,11 +329,11 @@ func (r *remoteBrokerOffsetStore) read(mq 
*primitive.MessageQueue, t readType) i
                })
                r.mutex.RUnlock()
                r.update(mq, off, true)
-               return off
+               return off, nil
        default:
        }
 
-       return -1
+       return -1, nil
 }
 
 func (r *remoteBrokerOffsetStore) update(mq *primitive.MessageQueue, offset 
int64, increaseOnly bool) {
diff --git a/consumer/offset_store_test.go b/consumer/offset_store_test.go
index cfa0eaa..d833f36 100644
--- a/consumer/offset_store_test.go
+++ b/consumer/offset_store_test.go
@@ -98,7 +98,7 @@ func TestLocalFileOffsetStore(t *testing.T) {
                                }
                                for _, value := range cases {
                                        localStore.update(value.queue, 
value.setOffset, false)
-                                       offset := localStore.read(value.queue, 
_ReadFromMemory)
+                                       offset, _ := 
localStore.readWithException(value.queue, _ReadFromMemory)
                                        So(offset, ShouldEqual, 
value.expectedOffset)
                                }
                        })
@@ -119,7 +119,7 @@ func TestLocalFileOffsetStore(t *testing.T) {
                                }
                                for _, value := range cases {
                                        localStore.update(value.queue, 
value.setOffset, true)
-                                       offset := localStore.read(value.queue, 
_ReadFromMemory)
+                                       offset, _ := 
localStore.readWithException(value.queue, _ReadFromMemory)
                                        So(offset, ShouldEqual, 
value.expectedOffset)
                                }
                        })
@@ -127,16 +127,16 @@ func TestLocalFileOffsetStore(t *testing.T) {
 
                Convey("test persist", func() {
                        localStore.update(mq, 1, false)
-                       offset := localStore.read(mq, _ReadFromMemory)
+                       offset, _ := localStore.readWithException(mq, 
_ReadFromMemory)
                        So(offset, ShouldEqual, 1)
 
                        queues := []*primitive.MessageQueue{mq}
                        localStore.persist(queues)
-                       offset = localStore.read(mq, _ReadFromStore)
+                       offset, _ = localStore.readWithException(mq, 
_ReadFromStore)
                        So(offset, ShouldEqual, 1)
 
                        
localStore.(*localFileOffsetStore).OffsetTable.Delete(MessageQueueKey(*mq))
-                       offset = localStore.read(mq, _ReadMemoryThenStore)
+                       offset, _ = localStore.readWithException(mq, 
_ReadMemoryThenStore)
                        So(offset, ShouldEqual, 1)
                })
        })
@@ -178,7 +178,7 @@ func TestRemoteBrokerOffsetStore(t *testing.T) {
                                }
                                for _, value := range cases {
                                        remoteStore.update(value.queue, 
value.setOffset, false)
-                                       offset := remoteStore.read(value.queue, 
_ReadFromMemory)
+                                       offset, _ := 
remoteStore.readWithException(value.queue, _ReadFromMemory)
                                        So(offset, ShouldEqual, 
value.expectedOffset)
                                }
                        })
@@ -199,7 +199,7 @@ func TestRemoteBrokerOffsetStore(t *testing.T) {
                                }
                                for _, value := range cases {
                                        remoteStore.update(value.queue, 
value.setOffset, true)
-                                       offset := remoteStore.read(value.queue, 
_ReadFromMemory)
+                                       offset, _ := 
remoteStore.readWithException(value.queue, _ReadFromMemory)
                                        So(offset, ShouldEqual, 
value.expectedOffset)
                                }
                        })
@@ -219,24 +219,24 @@ func TestRemoteBrokerOffsetStore(t *testing.T) {
                        rmqClient.EXPECT().InvokeSync(gomock.Any(), 
gomock.Any(), gomock.Any(), gomock.Any()).Return(ret, nil).MaxTimes(2)
 
                        remoteStore.persist(queues)
-                       offset := remoteStore.read(mq, _ReadFromStore)
+                       offset, _ := remoteStore.readWithException(mq, 
_ReadFromStore)
                        So(offset, ShouldEqual, 1)
 
                        remoteStore.remove(mq)
-                       offset = remoteStore.read(mq, _ReadFromMemory)
+                       offset, _ = remoteStore.readWithException(mq, 
_ReadFromMemory)
                        So(offset, ShouldEqual, -1)
-                       offset = remoteStore.read(mq, _ReadMemoryThenStore)
+                       offset, _ = remoteStore.readWithException(mq, 
_ReadMemoryThenStore)
                        So(offset, ShouldEqual, 1)
 
                })
 
                Convey("test remove", func() {
                        remoteStore.update(mq, 1, false)
-                       offset := remoteStore.read(mq, _ReadFromMemory)
+                       offset, _ := remoteStore.readWithException(mq, 
_ReadFromMemory)
                        So(offset, ShouldEqual, 1)
 
                        remoteStore.remove(mq)
-                       offset = remoteStore.read(mq, _ReadFromMemory)
+                       offset, _ = remoteStore.readWithException(mq, 
_ReadFromMemory)
                        So(offset, ShouldEqual, -1)
                })
        })
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index ff6f7d7..4699601 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -20,10 +20,11 @@ package consumer
 import (
        "context"
        "fmt"
-       errors2 "github.com/apache/rocketmq-client-go/v2/errors"
        "sync"
        "sync/atomic"
 
+       errors2 "github.com/apache/rocketmq-client-go/v2/errors"
+
        "github.com/pkg/errors"
 
        "github.com/apache/rocketmq-client-go/v2/internal"
@@ -219,7 +220,8 @@ func (c *defaultPullConsumer) makeSureStateOK() error {
 }
 
 func (c *defaultPullConsumer) nextOffsetOf(queue *primitive.MessageQueue) 
int64 {
-       return c.computePullFromWhere(queue)
+       result, _ := c.computePullFromWhereWithException(queue)
+       return result
 }
 
 // PullFrom pull messages of queue from the offset to offset + numbers
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 801f412..4ad5ee3 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -20,7 +20,6 @@ package consumer
 import (
        "context"
        "fmt"
-       errors2 "github.com/apache/rocketmq-client-go/v2/errors"
        "math"
        "runtime/pprof"
        "strconv"
@@ -29,6 +28,8 @@ import (
        "sync/atomic"
        "time"
 
+       errors2 "github.com/apache/rocketmq-client-go/v2/errors"
+
        "github.com/pkg/errors"
 
        "github.com/apache/rocketmq-client-go/v2/internal"
@@ -375,7 +376,7 @@ func (pc *pushConsumer) GetConsumerRunningInfo(stack bool) 
*internal.ConsumerRun
                mq := key.(primitive.MessageQueue)
                pq := value.(*processQueue)
                pInfo := pq.currentInfo()
-               pInfo.CommitOffset = pc.storage.read(&mq, _ReadMemoryThenStore)
+               pInfo.CommitOffset, _ = pc.storage.readWithException(&mq, 
_ReadMemoryThenStore)
                info.MQTable[mq] = pInfo
                return true
        })
@@ -644,7 +645,15 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
                } else {
                        if pq.IsLock() {
                                if !request.lockedFirst {
-                                       offset := 
pc.computePullFromWhere(request.mq)
+                                       offset, err := 
pc.computePullFromWhereWithException(request.mq)
+                                       if err != nil {
+                                               
rlog.Warning("computePullFromWhere from broker error", map[string]interface{}{
+                                                       
rlog.LogKeyUnderlayError: err.Error(),
+                                               })
+                                               sleepTime = 
_PullDelayTimeWhenError
+                                               goto NEXT
+                                       }
+
                                        brokerBusy := offset < 
request.nextOffset
                                        rlog.Info("the first time to pull 
message, so fix offset from broker, offset maybe changed", 
map[string]interface{}{
                                                rlog.LogKeyPullRequest:      
request.String(),
@@ -684,7 +693,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
                )
 
                if pc.model == Clustering {
-                       commitOffsetValue = pc.storage.read(request.mq, 
_ReadFromMemory)
+                       commitOffsetValue, _ = 
pc.storage.readWithException(request.mq, _ReadFromMemory)
                        if commitOffsetValue > 0 {
                                commitOffsetEnable = true
                        }

Reply via email to