This is an automated email from the ASF dual-hosted git repository.

yuzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 98ee94d  [ISSUE #1160] add brokerName in request protocol (#1161)
98ee94d is described below

commit 98ee94df100b8e53509523ee7b5b4633e0e0bb97
Author: yuz10 <[email protected]>
AuthorDate: Mon Sep 2 22:23:59 2024 +0800

    [ISSUE #1160] add brokerName in request protocol (#1161)
---
 consumer/consumer.go      | 14 +++++++++-----
 consumer/offset_store.go  |  2 ++
 consumer/process_queue.go |  2 +-
 consumer/pull_consumer.go |  2 ++
 consumer/push_consumer.go |  2 ++
 internal/request.go       | 27 ++++++++++++++++++++++-----
 internal/trace.go         |  1 +
 producer/producer.go      |  2 ++
 8 files changed, 41 insertions(+), 11 deletions(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index bbb5415..98eb17b 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -889,6 +889,7 @@ func (dc *defaultConsumer) pullInner(ctx context.Context, 
queue *primitive.Messa
                SubExpression:        data.SubString,
                // TODO: add subversion
                ExpressionType: string(data.ExpType),
+               BrokerName:     queue.BrokerName,
        }
 
        if data.ExpType == string(TAG) {
@@ -999,8 +1000,9 @@ func (dc *defaultConsumer) queryMaxOffset(mq 
*primitive.MessageQueue) (int64, er
        }
 
        request := &internal.GetMaxOffsetRequestHeader{
-               Topic:   mq.Topic,
-               QueueId: mq.QueueId,
+               Topic:      mq.Topic,
+               QueueId:    mq.QueueId,
+               BrokerName: mq.BrokerName,
        }
 
        cmd := remote.NewRemotingCommand(internal.ReqGetMaxOffset, request, nil)
@@ -1029,9 +1031,10 @@ func (dc *defaultConsumer) searchOffsetByTimestamp(mq 
*primitive.MessageQueue, t
        }
 
        request := &internal.SearchOffsetRequestHeader{
-               Topic:     mq.Topic,
-               QueueId:   mq.QueueId,
-               Timestamp: timestamp,
+               Topic:      mq.Topic,
+               QueueId:    mq.QueueId,
+               Timestamp:  timestamp,
+               BrokerName: mq.BrokerName,
        }
 
        cmd := remote.NewRemotingCommand(internal.ReqSearchOffsetByTimestamp, 
request, nil)
@@ -1128,6 +1131,7 @@ func buildSendToRetryRequest(mq *primitive.MessageQueue, 
msg *primitive.Message,
                Properties:        msg.MarshallProperties(),
                ReconsumeTimes:    int(reconsumeTimes),
                MaxReconsumeTimes: int(maxReconsumeTimes),
+               BrokerName:        mq.BrokerName,
        }
 
        return remote.NewRemotingCommand(internal.ReqSendMessage, req, msg.Body)
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index b0f9da9..543f397 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -390,6 +390,7 @@ func (r *remoteBrokerOffsetStore) 
fetchConsumeOffsetFromBroker(group string, mq
                ConsumerGroup: group,
                Topic:         mq.Topic,
                QueueId:       mq.QueueId,
+               BrokerName:    mq.BrokerName,
        }
        cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, 
queryOffsetRequest, nil)
        res, err := r.client.InvokeSync(context.Background(), broker, cmd, 
3*time.Second)
@@ -429,6 +430,7 @@ func (r *remoteBrokerOffsetStore) 
updateConsumeOffsetToBroker(group string, mq p
                Topic:         mq.Topic,
                QueueId:       mq.QueueId,
                CommitOffset:  off,
+               BrokerName:    mq.BrokerName,
        }
        cmd := remote.NewRemotingCommand(internal.ReqUpdateConsumerOffset, 
updateOffsetRequest, nil)
        return r.client.InvokeOneWay(context.Background(), broker, cmd, 
5*time.Second)
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index 120595f..db67c9f 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -275,7 +275,7 @@ func (pq *processQueue) cleanExpiredMsg(pc *pushConsumer) {
                                rlog.LogKeyQueueOffset: msg.QueueOffset,
                        })
                        pq.mutex.RUnlock()
-                       if !pc.sendMessageBack("", msg, 
int(3+msg.ReconsumeTimes)) {
+                       if !pc.sendMessageBack(msg.Queue.BrokerName, msg, 
int(3+msg.ReconsumeTimes)) {
                                rlog.Error("send message back to broker error 
when clean expired messages", map[string]interface{}{
                                        rlog.LogKeyConsumerGroup: 
pc.consumerGroup,
                                })
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index b26c829..a64c163 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -644,6 +644,7 @@ func (pc *defaultPullConsumer) buildSendBackRequest(msg 
*primitive.MessageExt, d
                DelayLevel:        delayLevel,
                OriginMsgId:       msg.MsgId,
                MaxReconsumeTimes: pc.getMaxReconsumeTimes(),
+               BrokerName:        msg.Queue.BrokerName,
        }
 
        return remote.NewRemotingCommand(internal.ReqConsumerSendMsgBack, req, 
nil)
@@ -746,6 +747,7 @@ func (pc *defaultPullConsumer) pullMessage(request 
*PullRequest) {
                        SubExpression:        sd.SubString,
                        ExpressionType:       string(TAG),
                        SuspendTimeoutMillis: 20 * time.Second,
+                       BrokerName:           request.mq.BrokerName,
                }
 
                brokerResult := pc.defaultConsumer.tryFindBroker(request.mq)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 0dfeee9..1a3d233 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -819,6 +819,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
                        SubExpression:        subExpression,
                        ExpressionType:       string(TAG),
                        SuspendTimeoutMillis: 20 * time.Second,
+                       BrokerName:           request.mq.BrokerName,
                }
                //
                //if data.ExpType == string(TAG) {
@@ -937,6 +938,7 @@ func (pc *pushConsumer) buildSendBackRequest(msg 
*primitive.MessageExt, delayLev
                DelayLevel:        delayLevel,
                OriginMsgId:       msg.MsgId,
                MaxReconsumeTimes: pc.getMaxReconsumeTimes(),
+               BrokerName:        msg.Queue.BrokerName,
        }
 
        return remote.NewRemotingCommand(internal.ReqConsumerSendMsgBack, req, 
nil)
diff --git a/internal/request.go b/internal/request.go
index 7e86b50..93d7a5d 100644
--- a/internal/request.go
+++ b/internal/request.go
@@ -72,6 +72,7 @@ type SendMessageRequestHeader struct {
        Batch                 bool
        DefaultTopic          string
        DefaultTopicQueueNums int
+       BrokerName            string
 }
 
 func (request *SendMessageRequestHeader) Encode() map[string]string {
@@ -89,6 +90,7 @@ func (request *SendMessageRequestHeader) Encode() 
map[string]string {
        maps["defaultTopicQueueNums"] = "4"
        maps["batch"] = strconv.FormatBool(request.Batch)
        maps["properties"] = request.Properties
+       maps["bname"] = request.BrokerName
 
        return maps
 }
@@ -101,6 +103,7 @@ type EndTransactionRequestHeader struct {
        FromTransactionCheck bool
        MsgID                string
        TransactionId        string
+       BrokerName           string
 }
 
 type SendMessageRequestV2Header struct {
@@ -122,6 +125,7 @@ func (request *SendMessageRequestV2Header) Encode() 
map[string]string {
        maps["k"] = strconv.FormatBool(request.UnitMode)
        maps["l"] = strconv.Itoa(request.MaxReconsumeTimes)
        maps["m"] = strconv.FormatBool(request.Batch)
+       maps["n"] = request.BrokerName
        return maps
 }
 
@@ -134,6 +138,7 @@ func (request *EndTransactionRequestHeader) Encode() 
map[string]string {
        maps["fromTransactionCheck"] = 
strconv.FormatBool(request.FromTransactionCheck)
        maps["msgId"] = request.MsgID
        maps["transactionId"] = request.TransactionId
+       maps["bname"] = request.BrokerName
        return maps
 }
 
@@ -185,6 +190,7 @@ type ConsumerSendMsgBackRequestHeader struct {
        OriginTopic       string
        UnitMode          bool
        MaxReconsumeTimes int32
+       BrokerName        string
 }
 
 func (request *ConsumerSendMsgBackRequestHeader) Encode() map[string]string {
@@ -196,6 +202,7 @@ func (request *ConsumerSendMsgBackRequestHeader) Encode() 
map[string]string {
        maps["originTopic"] = request.OriginTopic
        maps["unitMode"] = strconv.FormatBool(request.UnitMode)
        maps["maxReconsumeTimes"] = strconv.Itoa(int(request.MaxReconsumeTimes))
+       maps["bname"] = request.BrokerName
 
        return maps
 }
@@ -212,6 +219,7 @@ type PullMessageRequestHeader struct {
        SubExpression        string
        SubVersion           int64
        ExpressionType       string
+       BrokerName           string
 }
 
 func (request *PullMessageRequestHeader) Encode() map[string]string {
@@ -227,6 +235,7 @@ func (request *PullMessageRequestHeader) Encode() 
map[string]string {
        maps["subscription"] = request.SubExpression
        maps["subVersion"] = fmt.Sprintf("%d", request.SubVersion)
        maps["expressionType"] = request.ExpressionType
+       maps["bname"] = request.BrokerName
 
        return maps
 }
@@ -242,14 +251,16 @@ func (request *GetConsumerListRequestHeader) Encode() 
map[string]string {
 }
 
 type GetMaxOffsetRequestHeader struct {
-       Topic   string
-       QueueId int
+       Topic      string
+       QueueId    int
+       BrokerName string
 }
 
 func (request *GetMaxOffsetRequestHeader) Encode() map[string]string {
        maps := make(map[string]string)
        maps["topic"] = request.Topic
        maps["queueId"] = strconv.Itoa(request.QueueId)
+       maps["bname"] = request.BrokerName
        return maps
 }
 
@@ -257,6 +268,7 @@ type QueryConsumerOffsetRequestHeader struct {
        ConsumerGroup string
        Topic         string
        QueueId       int
+       BrokerName    string
 }
 
 func (request *QueryConsumerOffsetRequestHeader) Encode() map[string]string {
@@ -264,13 +276,15 @@ func (request *QueryConsumerOffsetRequestHeader) Encode() 
map[string]string {
        maps["consumerGroup"] = request.ConsumerGroup
        maps["topic"] = request.Topic
        maps["queueId"] = strconv.Itoa(request.QueueId)
+       maps["bname"] = request.BrokerName
        return maps
 }
 
 type SearchOffsetRequestHeader struct {
-       Topic     string
-       QueueId   int
-       Timestamp int64
+       Topic      string
+       QueueId    int
+       Timestamp  int64
+       BrokerName string
 }
 
 func (request *SearchOffsetRequestHeader) Encode() map[string]string {
@@ -278,6 +292,7 @@ func (request *SearchOffsetRequestHeader) Encode() 
map[string]string {
        maps["topic"] = request.Topic
        maps["queueId"] = strconv.Itoa(request.QueueId)
        maps["timestamp"] = strconv.FormatInt(request.Timestamp, 10)
+       maps["bname"] = request.BrokerName
        return maps
 }
 
@@ -286,6 +301,7 @@ type UpdateConsumerOffsetRequestHeader struct {
        Topic         string
        QueueId       int
        CommitOffset  int64
+       BrokerName    string
 }
 
 func (request *UpdateConsumerOffsetRequestHeader) Encode() map[string]string {
@@ -294,6 +310,7 @@ func (request *UpdateConsumerOffsetRequestHeader) Encode() 
map[string]string {
        maps["topic"] = request.Topic
        maps["queueId"] = strconv.Itoa(request.QueueId)
        maps["commitOffset"] = strconv.FormatInt(request.CommitOffset, 10)
+       maps["bname"] = request.BrokerName
        return maps
 }
 
diff --git a/internal/trace.go b/internal/trace.go
index f7cea8d..344acd6 100644
--- a/internal/trace.go
+++ b/internal/trace.go
@@ -527,6 +527,7 @@ func (td *traceDispatcher) buildSendRequest(mq 
*primitive.MessageQueue,
                BornTimestamp: time.Now().UnixNano() / int64(time.Millisecond),
                Flag:          msg.Flag,
                Properties:    msg.MarshallProperties(),
+               BrokerName:    mq.BrokerName,
        }
 
        return remote.NewRemotingCommand(ReqSendMessage, req, msg.Body)
diff --git a/producer/producer.go b/producer/producer.go
index eb3cd2e..00eb351 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -542,6 +542,7 @@ func (p *defaultProducer) buildSendRequest(mq 
*primitive.MessageQueue,
                Batch:                 msg.Batch,
                DefaultTopic:          p.options.CreateTopicKey,
                DefaultTopicQueueNums: p.options.DefaultTopicQueueNums,
+               BrokerName:            mq.BrokerName,
        }
 
        msgType := msg.GetProperty(primitive.PropertyMsgType)
@@ -762,6 +763,7 @@ func (tp *transactionProducer) endTransaction(result 
primitive.SendResult, err e
                TranStateTableOffset: result.QueueOffset,
                MsgID:                result.MsgID,
                CommitOrRollback:     tp.transactionState(state),
+               BrokerName:           result.MessageQueue.BrokerName,
        }
 
        req := remote.NewRemotingCommand(internal.ReqENDTransaction, 
requestHeader, nil)

Reply via email to