This is an automated email from the ASF dual-hosted git repository. cserwen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 533de03 fix: select one message queue of different broker when retry to send (#1014) 533de03 is described below commit 533de03048e19f4f925bd59a3d9cba6c5cba26b6 Author: cserwen <cser...@apache.org> AuthorDate: Wed Jun 28 15:34:34 2023 +0800 fix: select one message queue of different broker when retry to send (#1014) Co-authored-by: dengzhiwen1 <dengzhiw...@xiaomi.com> --- internal/response.go | 2 ++ producer/producer.go | 55 ++++++++++++++++++++++++++++++++++++++++------- producer/producer_test.go | 12 +++++++++-- producer/selector.go | 36 ++++++++++++++++++++++++------- producer/selector_test.go | 37 +++++++++++++++++++++++++------ 5 files changed, 118 insertions(+), 24 deletions(-) diff --git a/internal/response.go b/internal/response.go index d1c7e18..8a815ba 100644 --- a/internal/response.go +++ b/internal/response.go @@ -23,6 +23,8 @@ const ( ResFlushDiskTimeout = int16(10) ResSlaveNotAvailable = int16(11) ResFlushSlaveTimeout = int16(12) + ResServiceNotAvailable = int16(14) + ResNoPermission = int16(16) ResTopicNotExist = int16(17) ResPullNotFound = int16(19) ResPullRetryImmediately = int16(20) diff --git a/producer/producer.go b/producer/producer.go index ebd5e33..d8ca54b 100644 --- a/producer/producer.go +++ b/producer/producer.go @@ -155,6 +155,21 @@ func MarshalMessageBatch(msgs ...*primitive.Message) []byte { return buffer.Bytes() } +func needRetryCode(code int16) bool { + switch code { + case internal.ResTopicNotExist: + return true + case internal.ResServiceNotAvailable: + return true + case internal.ResError: + return true + case internal.ResNoPermission: + return true + default: + return false + } +} + func (p *defaultProducer) prepareSendRequest(msg *primitive.Message, ttl time.Duration) (string, error) { correlationId := uuid.New().String() requestClientId := p.client.ClientID() @@ -301,6 +316,7 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message, var ( err error + mq *primitive.MessageQueue ) var ( @@ -308,12 +324,23 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message, ok bool ) for retryCount := 0; retryCount < retryTime; retryCount++ { - mq := p.selectMessageQueue(msg) + var lastBrokerName string + if mq != nil { + lastBrokerName = mq.BrokerName + } + mq := p.selectMessageQueue(msg, lastBrokerName) if mq == nil { err = fmt.Errorf("the topic=%s route info not found", msg.Topic) continue } + if lastBrokerName != "" { + rlog.Warning("start retrying to send, ", map[string]interface{}{ + "lastBroker": lastBrokerName, + "newBroker": mq.BrokerName, + }) + } + addr := p.client.GetNameSrv().FindBrokerAddrByName(mq.BrokerName) if addr == "" { return fmt.Errorf("topic=%s route info not found", mq.Topic) @@ -333,6 +360,10 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message, err = _err continue } + + if needRetryCode(res.Code) && retryCount < retryTime-1 { + continue + } return p.client.ProcessSendResponse(mq.BrokerName, res, resp, msg) } return err @@ -359,7 +390,7 @@ func (p *defaultProducer) SendAsync(ctx context.Context, f func(context.Context, func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message, h func(context.Context, *primitive.SendResult, error)) error { - mq := p.selectMessageQueue(msg) + mq := p.selectMessageQueue(msg, "") if mq == nil { return errors.Errorf("the topic=%s route info not found", msg.Topic) } @@ -416,8 +447,13 @@ func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message retryTime := 1 + p.options.RetryTimes var err error + var mq *primitive.MessageQueue for retryCount := 0; retryCount < retryTime; retryCount++ { - mq := p.selectMessageQueue(msg) + var lastBrokerName string + if mq != nil { + lastBrokerName = mq.BrokerName + } + mq = p.selectMessageQueue(msg, lastBrokerName) if mq == nil { err = fmt.Errorf("the topic=%s route info not found", msg.Topic) continue @@ -554,13 +590,16 @@ func (p *defaultProducer) tryToFindTopicPublishInfo(topic string) *internal.Topi return result } -func (p *defaultProducer) selectMessageQueue(msg *primitive.Message) *primitive.MessageQueue { - topic := msg.Topic - result := p.tryToFindTopicPublishInfo(topic) - if result == nil { +func (p *defaultProducer) selectMessageQueue(msg *primitive.Message, lastBrokerName string) *primitive.MessageQueue { + result := p.tryToFindTopicPublishInfo(msg.Topic) + if result == nil || len(result.MqList) == 0 { + rlog.Warning("topic route info is nil or empty", map[string]interface{}{ + rlog.LogKeyTopic: msg.Topic, + "result": result, + }) return nil } - return p.options.Selector.Select(msg, result.MqList) + return p.options.Selector.Select(msg, result.MqList, lastBrokerName) } func (p *defaultProducer) PublishTopicList() []string { diff --git a/producer/producer_test.go b/producer/producer_test.go index e1d72dd..cbe7426 100644 --- a/producer/producer_test.go +++ b/producer/producer_test.go @@ -149,7 +149,11 @@ func TestSync(t *testing.T) { mockB4Send(p) - client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) + cmd := &remote.RemotingCommand{ + Code: internal.ResSuccess, + } + + client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(cmd, nil) client.EXPECT().ProcessSendResponse(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do( func(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, msgs ...*primitive.Message) { resp.Status = expectedResp.Status @@ -309,7 +313,11 @@ func TestSyncWithNamespace(t *testing.T) { mockB4Send(p) - client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) + cmd := &remote.RemotingCommand{ + Code: internal.ResSuccess, + } + + client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(cmd, nil) client.EXPECT().ProcessSendResponse(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do( func(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, msgs ...*primitive.Message) { resp.Status = expectedResp.Status diff --git a/producer/selector.go b/producer/selector.go index 74f5bad..c9da49a 100644 --- a/producer/selector.go +++ b/producer/selector.go @@ -27,7 +27,7 @@ import ( ) type QueueSelector interface { - Select(*primitive.Message, []*primitive.MessageQueue) *primitive.MessageQueue + Select(msg *primitive.Message, mqs []*primitive.MessageQueue, lastBrokerName string) *primitive.MessageQueue } // manualQueueSelector use the queue manually set in the provided Message's QueueID field as the queue to send. @@ -37,7 +37,7 @@ func NewManualQueueSelector() QueueSelector { return new(manualQueueSelector) } -func (manualQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue { +func (manualQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue, lastBrokerName string) *primitive.MessageQueue { return message.Queue } @@ -53,7 +53,7 @@ func NewRandomQueueSelector() QueueSelector { return s } -func (r *randomQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue { +func (r *randomQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue, lastBrokerName string) *primitive.MessageQueue { r.mux.Lock() i := r.rander.Intn(len(queues)) r.mux.Unlock() @@ -74,11 +74,32 @@ func NewRoundRobinQueueSelector() QueueSelector { return s } -func (r *roundRobinQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue { +func (r *roundRobinQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue, lastBrokerName string) *primitive.MessageQueue { t := message.Topic - var idx *uint32 r.Lock() + defer r.Unlock() + if lastBrokerName != "" { + for i := 0; i < len(queues); i++ { + idx, exist := r.indexer[t] + if !exist { + var v uint32 = 0 + idx = &v + r.indexer[t] = idx + } + *idx++ + qIndex := *idx % uint32(len(queues)) + if queues[qIndex].BrokerName != lastBrokerName { + return queues[qIndex] + } + } + } + return r.selectOneMessageQueue(t, queues) +} + +func (r *roundRobinQueueSelector) selectOneMessageQueue(t string, queues []*primitive.MessageQueue) *primitive.MessageQueue { + var idx *uint32 + idx, exist := r.indexer[t] if !exist { var v uint32 = 0 @@ -86,7 +107,6 @@ func (r *roundRobinQueueSelector) Select(message *primitive.Message, queues []*p r.indexer[t] = idx } *idx++ - r.Unlock() qIndex := *idx % uint32(len(queues)) return queues[qIndex] @@ -103,10 +123,10 @@ func NewHashQueueSelector() QueueSelector { } // hashQueueSelector choose the queue by hash if message having sharding key, otherwise choose queue by random instead. -func (h *hashQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue { +func (h *hashQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue, lastBrokerName string) *primitive.MessageQueue { key := message.GetShardingKey() if len(key) == 0 { - return h.random.Select(message, queues) + return h.random.Select(message, queues, lastBrokerName) } hasher := fnv.New32a() diff --git a/producer/selector_test.go b/producer/selector_test.go index 72dc469..e0b4384 100644 --- a/producer/selector_test.go +++ b/producer/selector_test.go @@ -26,7 +26,7 @@ import ( ) func TestRoundRobin(t *testing.T) { - queues := make([]*primitive.MessageQueue, 10) + queues := make([]*primitive.MessageQueue, 0) for i := 0; i < 10; i++ { queues = append(queues, &primitive.MessageQueue{ QueueId: i, @@ -41,18 +41,43 @@ func TestRoundRobin(t *testing.T) { Topic: "rr", } for i := 0; i < 100; i++ { - q := s.Select(m, queues) + q := s.Select(m, queues, "") expected := (i + 1) % len(queues) assert.Equal(t, queues[expected], q, "i: %d", i) - qrr := s.Select(mrr, queues) + qrr := s.Select(mrr, queues, "") expected = (i + 1) % len(queues) assert.Equal(t, queues[expected], qrr, "i: %d", i) } } +func TestRoundRobinRetry(t *testing.T) { + queues := make([]*primitive.MessageQueue, 0) + brokerA := "brokerA" + brokerB := "brokerB" + for i := 0; i < 5; i++ { + queues = append(queues, &primitive.MessageQueue{ + QueueId: i, + BrokerName: brokerA, + }) + queues = append(queues, &primitive.MessageQueue{ + QueueId: i, + BrokerName: brokerB, + }) + } + s := NewRoundRobinQueueSelector() + + m := &primitive.Message{ + Topic: "test", + } + for i := 0; i < 100; i++ { + q := s.Select(m, queues, brokerA) + assert.Equal(t, brokerB, q.BrokerName) + } +} + func TestHashQueueSelector(t *testing.T) { - queues := make([]*primitive.MessageQueue, 10) + queues := make([]*primitive.MessageQueue, 0) for i := 0; i < 10; i++ { queues = append(queues, &primitive.MessageQueue{ QueueId: i, @@ -66,13 +91,13 @@ func TestHashQueueSelector(t *testing.T) { Body: []byte("one message"), } m1.WithShardingKey("same_key") - q1 := s.Select(m1, queues) + q1 := s.Select(m1, queues, "") m2 := &primitive.Message{ Topic: "test", Body: []byte("another message"), } m2.WithShardingKey("same_key") - q2 := s.Select(m2, queues) + q2 := s.Select(m2, queues, "") assert.Equal(t, *q1, *q2) }