This is an automated email from the ASF dual-hosted git repository.
yuzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 98ee94d [ISSUE #1160] add brokerName in request protocol (#1161)
98ee94d is described below
commit 98ee94df100b8e53509523ee7b5b4633e0e0bb97
Author: yuz10 <[email protected]>
AuthorDate: Mon Sep 2 22:23:59 2024 +0800
[ISSUE #1160] add brokerName in request protocol (#1161)
---
consumer/consumer.go | 14 +++++++++-----
consumer/offset_store.go | 2 ++
consumer/process_queue.go | 2 +-
consumer/pull_consumer.go | 2 ++
consumer/push_consumer.go | 2 ++
internal/request.go | 27 ++++++++++++++++++++++-----
internal/trace.go | 1 +
producer/producer.go | 2 ++
8 files changed, 41 insertions(+), 11 deletions(-)
diff --git a/consumer/consumer.go b/consumer/consumer.go
index bbb5415..98eb17b 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -889,6 +889,7 @@ func (dc *defaultConsumer) pullInner(ctx context.Context,
queue *primitive.Messa
SubExpression: data.SubString,
// TODO: add subversion
ExpressionType: string(data.ExpType),
+ BrokerName: queue.BrokerName,
}
if data.ExpType == string(TAG) {
@@ -999,8 +1000,9 @@ func (dc *defaultConsumer) queryMaxOffset(mq
*primitive.MessageQueue) (int64, er
}
request := &internal.GetMaxOffsetRequestHeader{
- Topic: mq.Topic,
- QueueId: mq.QueueId,
+ Topic: mq.Topic,
+ QueueId: mq.QueueId,
+ BrokerName: mq.BrokerName,
}
cmd := remote.NewRemotingCommand(internal.ReqGetMaxOffset, request, nil)
@@ -1029,9 +1031,10 @@ func (dc *defaultConsumer) searchOffsetByTimestamp(mq
*primitive.MessageQueue, t
}
request := &internal.SearchOffsetRequestHeader{
- Topic: mq.Topic,
- QueueId: mq.QueueId,
- Timestamp: timestamp,
+ Topic: mq.Topic,
+ QueueId: mq.QueueId,
+ Timestamp: timestamp,
+ BrokerName: mq.BrokerName,
}
cmd := remote.NewRemotingCommand(internal.ReqSearchOffsetByTimestamp,
request, nil)
@@ -1128,6 +1131,7 @@ func buildSendToRetryRequest(mq *primitive.MessageQueue,
msg *primitive.Message,
Properties: msg.MarshallProperties(),
ReconsumeTimes: int(reconsumeTimes),
MaxReconsumeTimes: int(maxReconsumeTimes),
+ BrokerName: mq.BrokerName,
}
return remote.NewRemotingCommand(internal.ReqSendMessage, req, msg.Body)
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index b0f9da9..543f397 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -390,6 +390,7 @@ func (r *remoteBrokerOffsetStore)
fetchConsumeOffsetFromBroker(group string, mq
ConsumerGroup: group,
Topic: mq.Topic,
QueueId: mq.QueueId,
+ BrokerName: mq.BrokerName,
}
cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset,
queryOffsetRequest, nil)
res, err := r.client.InvokeSync(context.Background(), broker, cmd,
3*time.Second)
@@ -429,6 +430,7 @@ func (r *remoteBrokerOffsetStore)
updateConsumeOffsetToBroker(group string, mq p
Topic: mq.Topic,
QueueId: mq.QueueId,
CommitOffset: off,
+ BrokerName: mq.BrokerName,
}
cmd := remote.NewRemotingCommand(internal.ReqUpdateConsumerOffset,
updateOffsetRequest, nil)
return r.client.InvokeOneWay(context.Background(), broker, cmd,
5*time.Second)
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index 120595f..db67c9f 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -275,7 +275,7 @@ func (pq *processQueue) cleanExpiredMsg(pc *pushConsumer) {
rlog.LogKeyQueueOffset: msg.QueueOffset,
})
pq.mutex.RUnlock()
- if !pc.sendMessageBack("", msg,
int(3+msg.ReconsumeTimes)) {
+ if !pc.sendMessageBack(msg.Queue.BrokerName, msg,
int(3+msg.ReconsumeTimes)) {
rlog.Error("send message back to broker error
when clean expired messages", map[string]interface{}{
rlog.LogKeyConsumerGroup:
pc.consumerGroup,
})
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index b26c829..a64c163 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -644,6 +644,7 @@ func (pc *defaultPullConsumer) buildSendBackRequest(msg
*primitive.MessageExt, d
DelayLevel: delayLevel,
OriginMsgId: msg.MsgId,
MaxReconsumeTimes: pc.getMaxReconsumeTimes(),
+ BrokerName: msg.Queue.BrokerName,
}
return remote.NewRemotingCommand(internal.ReqConsumerSendMsgBack, req,
nil)
@@ -746,6 +747,7 @@ func (pc *defaultPullConsumer) pullMessage(request
*PullRequest) {
SubExpression: sd.SubString,
ExpressionType: string(TAG),
SuspendTimeoutMillis: 20 * time.Second,
+ BrokerName: request.mq.BrokerName,
}
brokerResult := pc.defaultConsumer.tryFindBroker(request.mq)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 0dfeee9..1a3d233 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -819,6 +819,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
SubExpression: subExpression,
ExpressionType: string(TAG),
SuspendTimeoutMillis: 20 * time.Second,
+ BrokerName: request.mq.BrokerName,
}
//
//if data.ExpType == string(TAG) {
@@ -937,6 +938,7 @@ func (pc *pushConsumer) buildSendBackRequest(msg
*primitive.MessageExt, delayLev
DelayLevel: delayLevel,
OriginMsgId: msg.MsgId,
MaxReconsumeTimes: pc.getMaxReconsumeTimes(),
+ BrokerName: msg.Queue.BrokerName,
}
return remote.NewRemotingCommand(internal.ReqConsumerSendMsgBack, req,
nil)
diff --git a/internal/request.go b/internal/request.go
index 7e86b50..93d7a5d 100644
--- a/internal/request.go
+++ b/internal/request.go
@@ -72,6 +72,7 @@ type SendMessageRequestHeader struct {
Batch bool
DefaultTopic string
DefaultTopicQueueNums int
+ BrokerName string
}
func (request *SendMessageRequestHeader) Encode() map[string]string {
@@ -89,6 +90,7 @@ func (request *SendMessageRequestHeader) Encode()
map[string]string {
maps["defaultTopicQueueNums"] = "4"
maps["batch"] = strconv.FormatBool(request.Batch)
maps["properties"] = request.Properties
+ maps["bname"] = request.BrokerName
return maps
}
@@ -101,6 +103,7 @@ type EndTransactionRequestHeader struct {
FromTransactionCheck bool
MsgID string
TransactionId string
+ BrokerName string
}
type SendMessageRequestV2Header struct {
@@ -122,6 +125,7 @@ func (request *SendMessageRequestV2Header) Encode()
map[string]string {
maps["k"] = strconv.FormatBool(request.UnitMode)
maps["l"] = strconv.Itoa(request.MaxReconsumeTimes)
maps["m"] = strconv.FormatBool(request.Batch)
+ maps["n"] = request.BrokerName
return maps
}
@@ -134,6 +138,7 @@ func (request *EndTransactionRequestHeader) Encode()
map[string]string {
maps["fromTransactionCheck"] =
strconv.FormatBool(request.FromTransactionCheck)
maps["msgId"] = request.MsgID
maps["transactionId"] = request.TransactionId
+ maps["bname"] = request.BrokerName
return maps
}
@@ -185,6 +190,7 @@ type ConsumerSendMsgBackRequestHeader struct {
OriginTopic string
UnitMode bool
MaxReconsumeTimes int32
+ BrokerName string
}
func (request *ConsumerSendMsgBackRequestHeader) Encode() map[string]string {
@@ -196,6 +202,7 @@ func (request *ConsumerSendMsgBackRequestHeader) Encode()
map[string]string {
maps["originTopic"] = request.OriginTopic
maps["unitMode"] = strconv.FormatBool(request.UnitMode)
maps["maxReconsumeTimes"] = strconv.Itoa(int(request.MaxReconsumeTimes))
+ maps["bname"] = request.BrokerName
return maps
}
@@ -212,6 +219,7 @@ type PullMessageRequestHeader struct {
SubExpression string
SubVersion int64
ExpressionType string
+ BrokerName string
}
func (request *PullMessageRequestHeader) Encode() map[string]string {
@@ -227,6 +235,7 @@ func (request *PullMessageRequestHeader) Encode()
map[string]string {
maps["subscription"] = request.SubExpression
maps["subVersion"] = fmt.Sprintf("%d", request.SubVersion)
maps["expressionType"] = request.ExpressionType
+ maps["bname"] = request.BrokerName
return maps
}
@@ -242,14 +251,16 @@ func (request *GetConsumerListRequestHeader) Encode()
map[string]string {
}
type GetMaxOffsetRequestHeader struct {
- Topic string
- QueueId int
+ Topic string
+ QueueId int
+ BrokerName string
}
func (request *GetMaxOffsetRequestHeader) Encode() map[string]string {
maps := make(map[string]string)
maps["topic"] = request.Topic
maps["queueId"] = strconv.Itoa(request.QueueId)
+ maps["bname"] = request.BrokerName
return maps
}
@@ -257,6 +268,7 @@ type QueryConsumerOffsetRequestHeader struct {
ConsumerGroup string
Topic string
QueueId int
+ BrokerName string
}
func (request *QueryConsumerOffsetRequestHeader) Encode() map[string]string {
@@ -264,13 +276,15 @@ func (request *QueryConsumerOffsetRequestHeader) Encode()
map[string]string {
maps["consumerGroup"] = request.ConsumerGroup
maps["topic"] = request.Topic
maps["queueId"] = strconv.Itoa(request.QueueId)
+ maps["bname"] = request.BrokerName
return maps
}
type SearchOffsetRequestHeader struct {
- Topic string
- QueueId int
- Timestamp int64
+ Topic string
+ QueueId int
+ Timestamp int64
+ BrokerName string
}
func (request *SearchOffsetRequestHeader) Encode() map[string]string {
@@ -278,6 +292,7 @@ func (request *SearchOffsetRequestHeader) Encode()
map[string]string {
maps["topic"] = request.Topic
maps["queueId"] = strconv.Itoa(request.QueueId)
maps["timestamp"] = strconv.FormatInt(request.Timestamp, 10)
+ maps["bname"] = request.BrokerName
return maps
}
@@ -286,6 +301,7 @@ type UpdateConsumerOffsetRequestHeader struct {
Topic string
QueueId int
CommitOffset int64
+ BrokerName string
}
func (request *UpdateConsumerOffsetRequestHeader) Encode() map[string]string {
@@ -294,6 +310,7 @@ func (request *UpdateConsumerOffsetRequestHeader) Encode()
map[string]string {
maps["topic"] = request.Topic
maps["queueId"] = strconv.Itoa(request.QueueId)
maps["commitOffset"] = strconv.FormatInt(request.CommitOffset, 10)
+ maps["bname"] = request.BrokerName
return maps
}
diff --git a/internal/trace.go b/internal/trace.go
index f7cea8d..344acd6 100644
--- a/internal/trace.go
+++ b/internal/trace.go
@@ -527,6 +527,7 @@ func (td *traceDispatcher) buildSendRequest(mq
*primitive.MessageQueue,
BornTimestamp: time.Now().UnixNano() / int64(time.Millisecond),
Flag: msg.Flag,
Properties: msg.MarshallProperties(),
+ BrokerName: mq.BrokerName,
}
return remote.NewRemotingCommand(ReqSendMessage, req, msg.Body)
diff --git a/producer/producer.go b/producer/producer.go
index eb3cd2e..00eb351 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -542,6 +542,7 @@ func (p *defaultProducer) buildSendRequest(mq
*primitive.MessageQueue,
Batch: msg.Batch,
DefaultTopic: p.options.CreateTopicKey,
DefaultTopicQueueNums: p.options.DefaultTopicQueueNums,
+ BrokerName: mq.BrokerName,
}
msgType := msg.GetProperty(primitive.PropertyMsgType)
@@ -762,6 +763,7 @@ func (tp *transactionProducer) endTransaction(result
primitive.SendResult, err e
TranStateTableOffset: result.QueueOffset,
MsgID: result.MsgID,
CommitOrRollback: tp.transactionState(state),
+ BrokerName: result.MessageQueue.BrokerName,
}
req := remote.NewRemotingCommand(internal.ReqENDTransaction,
requestHeader, nil)