This is an automated email from the ASF dual-hosted git repository.

cserwen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 533de03  fix: select one message queue of different broker when retry 
to send (#1014)
533de03 is described below

commit 533de03048e19f4f925bd59a3d9cba6c5cba26b6
Author: cserwen <cser...@apache.org>
AuthorDate: Wed Jun 28 15:34:34 2023 +0800

    fix: select one message queue of different broker when retry to send (#1014)
    
    Co-authored-by: dengzhiwen1 <dengzhiw...@xiaomi.com>
---
 internal/response.go      |  2 ++
 producer/producer.go      | 55 ++++++++++++++++++++++++++++++++++++++++-------
 producer/producer_test.go | 12 +++++++++--
 producer/selector.go      | 36 ++++++++++++++++++++++++-------
 producer/selector_test.go | 37 +++++++++++++++++++++++++------
 5 files changed, 118 insertions(+), 24 deletions(-)

diff --git a/internal/response.go b/internal/response.go
index d1c7e18..8a815ba 100644
--- a/internal/response.go
+++ b/internal/response.go
@@ -23,6 +23,8 @@ const (
        ResFlushDiskTimeout     = int16(10)
        ResSlaveNotAvailable    = int16(11)
        ResFlushSlaveTimeout    = int16(12)
+       ResServiceNotAvailable  = int16(14)
+       ResNoPermission         = int16(16)
        ResTopicNotExist        = int16(17)
        ResPullNotFound         = int16(19)
        ResPullRetryImmediately = int16(20)
diff --git a/producer/producer.go b/producer/producer.go
index ebd5e33..d8ca54b 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -155,6 +155,21 @@ func MarshalMessageBatch(msgs ...*primitive.Message) 
[]byte {
        return buffer.Bytes()
 }
 
+func needRetryCode(code int16) bool {
+       switch code {
+       case internal.ResTopicNotExist:
+               return true
+       case internal.ResServiceNotAvailable:
+               return true
+       case internal.ResError:
+               return true
+       case internal.ResNoPermission:
+               return true
+       default:
+               return false
+       }
+}
+
 func (p *defaultProducer) prepareSendRequest(msg *primitive.Message, ttl 
time.Duration) (string, error) {
        correlationId := uuid.New().String()
        requestClientId := p.client.ClientID()
@@ -301,6 +316,7 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg 
*primitive.Message,
 
        var (
                err error
+               mq  *primitive.MessageQueue
        )
 
        var (
@@ -308,12 +324,23 @@ func (p *defaultProducer) sendSync(ctx context.Context, 
msg *primitive.Message,
                ok          bool
        )
        for retryCount := 0; retryCount < retryTime; retryCount++ {
-               mq := p.selectMessageQueue(msg)
+               var lastBrokerName string
+               if mq != nil {
+                       lastBrokerName = mq.BrokerName
+               }
+               mq := p.selectMessageQueue(msg, lastBrokerName)
                if mq == nil {
                        err = fmt.Errorf("the topic=%s route info not found", 
msg.Topic)
                        continue
                }
 
+               if lastBrokerName != "" {
+                       rlog.Warning("start retrying to send, ", 
map[string]interface{}{
+                               "lastBroker": lastBrokerName,
+                               "newBroker":  mq.BrokerName,
+                       })
+               }
+
                addr := 
p.client.GetNameSrv().FindBrokerAddrByName(mq.BrokerName)
                if addr == "" {
                        return fmt.Errorf("topic=%s route info not found", 
mq.Topic)
@@ -333,6 +360,10 @@ func (p *defaultProducer) sendSync(ctx context.Context, 
msg *primitive.Message,
                        err = _err
                        continue
                }
+
+               if needRetryCode(res.Code) && retryCount < retryTime-1 {
+                       continue
+               }
                return p.client.ProcessSendResponse(mq.BrokerName, res, resp, 
msg)
        }
        return err
@@ -359,7 +390,7 @@ func (p *defaultProducer) SendAsync(ctx context.Context, f 
func(context.Context,
 
 func (p *defaultProducer) sendAsync(ctx context.Context, msg 
*primitive.Message, h func(context.Context, *primitive.SendResult, error)) 
error {
 
-       mq := p.selectMessageQueue(msg)
+       mq := p.selectMessageQueue(msg, "")
        if mq == nil {
                return errors.Errorf("the topic=%s route info not found", 
msg.Topic)
        }
@@ -416,8 +447,13 @@ func (p *defaultProducer) sendOneWay(ctx context.Context, 
msg *primitive.Message
        retryTime := 1 + p.options.RetryTimes
 
        var err error
+       var mq *primitive.MessageQueue
        for retryCount := 0; retryCount < retryTime; retryCount++ {
-               mq := p.selectMessageQueue(msg)
+               var lastBrokerName string
+               if mq != nil {
+                       lastBrokerName = mq.BrokerName
+               }
+               mq = p.selectMessageQueue(msg, lastBrokerName)
                if mq == nil {
                        err = fmt.Errorf("the topic=%s route info not found", 
msg.Topic)
                        continue
@@ -554,13 +590,16 @@ func (p *defaultProducer) tryToFindTopicPublishInfo(topic 
string) *internal.Topi
        return result
 }
 
-func (p *defaultProducer) selectMessageQueue(msg *primitive.Message) 
*primitive.MessageQueue {
-       topic := msg.Topic
-       result := p.tryToFindTopicPublishInfo(topic)
-       if result == nil {
+func (p *defaultProducer) selectMessageQueue(msg *primitive.Message, 
lastBrokerName string) *primitive.MessageQueue {
+       result := p.tryToFindTopicPublishInfo(msg.Topic)
+       if result == nil || len(result.MqList) == 0 {
+               rlog.Warning("topic route info is nil or empty", 
map[string]interface{}{
+                       rlog.LogKeyTopic: msg.Topic,
+                       "result":         result,
+               })
                return nil
        }
-       return p.options.Selector.Select(msg, result.MqList)
+       return p.options.Selector.Select(msg, result.MqList, lastBrokerName)
 }
 
 func (p *defaultProducer) PublishTopicList() []string {
diff --git a/producer/producer_test.go b/producer/producer_test.go
index e1d72dd..cbe7426 100644
--- a/producer/producer_test.go
+++ b/producer/producer_test.go
@@ -149,7 +149,11 @@ func TestSync(t *testing.T) {
 
        mockB4Send(p)
 
-       client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), 
gomock.Any()).Return(nil, nil)
+       cmd := &remote.RemotingCommand{
+               Code: internal.ResSuccess,
+       }
+
+       client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), 
gomock.Any()).Return(cmd, nil)
        client.EXPECT().ProcessSendResponse(gomock.Any(), gomock.Any(), 
gomock.Any(), gomock.Any()).Do(
                func(brokerName string, cmd *remote.RemotingCommand, resp 
*primitive.SendResult, msgs ...*primitive.Message) {
                        resp.Status = expectedResp.Status
@@ -309,7 +313,11 @@ func TestSyncWithNamespace(t *testing.T) {
 
        mockB4Send(p)
 
-       client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), 
gomock.Any()).Return(nil, nil)
+       cmd := &remote.RemotingCommand{
+               Code: internal.ResSuccess,
+       }
+
+       client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), 
gomock.Any()).Return(cmd, nil)
        client.EXPECT().ProcessSendResponse(gomock.Any(), gomock.Any(), 
gomock.Any(), gomock.Any()).Do(
                func(brokerName string, cmd *remote.RemotingCommand, resp 
*primitive.SendResult, msgs ...*primitive.Message) {
                        resp.Status = expectedResp.Status
diff --git a/producer/selector.go b/producer/selector.go
index 74f5bad..c9da49a 100644
--- a/producer/selector.go
+++ b/producer/selector.go
@@ -27,7 +27,7 @@ import (
 )
 
 type QueueSelector interface {
-       Select(*primitive.Message, []*primitive.MessageQueue) 
*primitive.MessageQueue
+       Select(msg *primitive.Message, mqs []*primitive.MessageQueue, 
lastBrokerName string) *primitive.MessageQueue
 }
 
 // manualQueueSelector use the queue manually set in the provided Message's 
QueueID  field as the queue to send.
@@ -37,7 +37,7 @@ func NewManualQueueSelector() QueueSelector {
        return new(manualQueueSelector)
 }
 
-func (manualQueueSelector) Select(message *primitive.Message, queues 
[]*primitive.MessageQueue) *primitive.MessageQueue {
+func (manualQueueSelector) Select(message *primitive.Message, queues 
[]*primitive.MessageQueue, lastBrokerName string) *primitive.MessageQueue {
        return message.Queue
 }
 
@@ -53,7 +53,7 @@ func NewRandomQueueSelector() QueueSelector {
        return s
 }
 
-func (r *randomQueueSelector) Select(message *primitive.Message, queues 
[]*primitive.MessageQueue) *primitive.MessageQueue {
+func (r *randomQueueSelector) Select(message *primitive.Message, queues 
[]*primitive.MessageQueue, lastBrokerName string) *primitive.MessageQueue {
        r.mux.Lock()
        i := r.rander.Intn(len(queues))
        r.mux.Unlock()
@@ -74,11 +74,32 @@ func NewRoundRobinQueueSelector() QueueSelector {
        return s
 }
 
-func (r *roundRobinQueueSelector) Select(message *primitive.Message, queues 
[]*primitive.MessageQueue) *primitive.MessageQueue {
+func (r *roundRobinQueueSelector) Select(message *primitive.Message, queues 
[]*primitive.MessageQueue, lastBrokerName string) *primitive.MessageQueue {
        t := message.Topic
-       var idx *uint32
 
        r.Lock()
+       defer r.Unlock()
+       if lastBrokerName != "" {
+               for i := 0; i < len(queues); i++ {
+                       idx, exist := r.indexer[t]
+                       if !exist {
+                               var v uint32 = 0
+                               idx = &v
+                               r.indexer[t] = idx
+                       }
+                       *idx++
+                       qIndex := *idx % uint32(len(queues))
+                       if queues[qIndex].BrokerName != lastBrokerName {
+                               return queues[qIndex]
+                       }
+               }
+       }
+       return r.selectOneMessageQueue(t, queues)
+}
+
+func (r *roundRobinQueueSelector) selectOneMessageQueue(t string, queues 
[]*primitive.MessageQueue) *primitive.MessageQueue {
+       var idx *uint32
+
        idx, exist := r.indexer[t]
        if !exist {
                var v uint32 = 0
@@ -86,7 +107,6 @@ func (r *roundRobinQueueSelector) Select(message 
*primitive.Message, queues []*p
                r.indexer[t] = idx
        }
        *idx++
-       r.Unlock()
 
        qIndex := *idx % uint32(len(queues))
        return queues[qIndex]
@@ -103,10 +123,10 @@ func NewHashQueueSelector() QueueSelector {
 }
 
 // hashQueueSelector choose the queue by hash if message having sharding key, 
otherwise choose queue by random instead.
-func (h *hashQueueSelector) Select(message *primitive.Message, queues 
[]*primitive.MessageQueue) *primitive.MessageQueue {
+func (h *hashQueueSelector) Select(message *primitive.Message, queues 
[]*primitive.MessageQueue, lastBrokerName string) *primitive.MessageQueue {
        key := message.GetShardingKey()
        if len(key) == 0 {
-               return h.random.Select(message, queues)
+               return h.random.Select(message, queues, lastBrokerName)
        }
 
        hasher := fnv.New32a()
diff --git a/producer/selector_test.go b/producer/selector_test.go
index 72dc469..e0b4384 100644
--- a/producer/selector_test.go
+++ b/producer/selector_test.go
@@ -26,7 +26,7 @@ import (
 )
 
 func TestRoundRobin(t *testing.T) {
-       queues := make([]*primitive.MessageQueue, 10)
+       queues := make([]*primitive.MessageQueue, 0)
        for i := 0; i < 10; i++ {
                queues = append(queues, &primitive.MessageQueue{
                        QueueId: i,
@@ -41,18 +41,43 @@ func TestRoundRobin(t *testing.T) {
                Topic: "rr",
        }
        for i := 0; i < 100; i++ {
-               q := s.Select(m, queues)
+               q := s.Select(m, queues, "")
                expected := (i + 1) % len(queues)
                assert.Equal(t, queues[expected], q, "i: %d", i)
 
-               qrr := s.Select(mrr, queues)
+               qrr := s.Select(mrr, queues, "")
                expected = (i + 1) % len(queues)
                assert.Equal(t, queues[expected], qrr, "i: %d", i)
        }
 }
 
+func TestRoundRobinRetry(t *testing.T) {
+       queues := make([]*primitive.MessageQueue, 0)
+       brokerA := "brokerA"
+       brokerB := "brokerB"
+       for i := 0; i < 5; i++ {
+               queues = append(queues, &primitive.MessageQueue{
+                       QueueId:    i,
+                       BrokerName: brokerA,
+               })
+               queues = append(queues, &primitive.MessageQueue{
+                       QueueId:    i,
+                       BrokerName: brokerB,
+               })
+       }
+       s := NewRoundRobinQueueSelector()
+
+       m := &primitive.Message{
+               Topic: "test",
+       }
+       for i := 0; i < 100; i++ {
+               q := s.Select(m, queues, brokerA)
+               assert.Equal(t, brokerB, q.BrokerName)
+       }
+}
+
 func TestHashQueueSelector(t *testing.T) {
-       queues := make([]*primitive.MessageQueue, 10)
+       queues := make([]*primitive.MessageQueue, 0)
        for i := 0; i < 10; i++ {
                queues = append(queues, &primitive.MessageQueue{
                        QueueId: i,
@@ -66,13 +91,13 @@ func TestHashQueueSelector(t *testing.T) {
                Body:  []byte("one message"),
        }
        m1.WithShardingKey("same_key")
-       q1 := s.Select(m1, queues)
+       q1 := s.Select(m1, queues, "")
 
        m2 := &primitive.Message{
                Topic: "test",
                Body:  []byte("another message"),
        }
        m2.WithShardingKey("same_key")
-       q2 := s.Select(m2, queues)
+       q2 := s.Select(m2, queues, "")
        assert.Equal(t, *q1, *q2)
 }

Reply via email to