GitHub user loringgit edited a discussion: FIFO topic 消费端非严格有序
背景:
1.rockermq 5.3 版本,客户端版本:rocketmq-clients/golang/v5
2.创建FIFO topic,2 个queue
3.Consumer group 下有两个consumer实例
4.producer 发送消息的message group 都是fifo001,也就是只有一个queue有消息
'''
Queue | minOffset | maxOffset | lastUpdateTimeStamp
-- | -- | -- | --
MessageQueue [topic=test_fifo_topic, brokerName=cf8605034927, queueId=1] | 0 |
0 | 1970-01-01 08:00:00
MessageQueue [topic=test_fifo_topic, brokerName=cf8605034927, queueId=0] | 0 |
1000 | 2025-04-01 09:28:13
'''
5.consumer 端代码参考的是example中SimpleConsumer,流程:收到消息后打印log=>模拟消息处理sleep一秒=> 打印log=>
ACK,具体代码如下:
```
func main() {
// log to console
os.Setenv("mq.consoleAppender.enabled", "true")
rmq_client.ResetLogger()
// In most case, you don't need to create many consumers, singleton
pattern is more recommended.
simpleConsumer, err := rmq_client.NewSimpleConsumer(&rmq_client.Config{
Endpoint: Endpoint,
ConsumerGroup: ConsumerGroup,
Credentials: &credentials.SessionCredentials{
AccessKey: AccessKey,
AccessSecret: SecretKey,
},
},
rmq_client.WithAwaitDuration(awaitDuration),
rmq_client.WithSubscriptionExpressions(map[string]*rmq_client.FilterExpression{
Topic: rmq_client.SUB_ALL,
}),
)
if err != nil {
log.Fatal(err)
}
// start simpleConsumer
err = simpleConsumer.Start()
if err != nil {
log.Fatal(err)
}
// graceful stop simpleConsumer
defer simpleConsumer.GracefulStop()
go func() {
fmt.Println("start receive message")
for {
mvs, err := simpleConsumer.Receive(context.TODO(),
maxMessageNum, invisibleDuration)
if err != nil {
fmt.Println(err)
}
// ack message
for _, mv := range mvs {
fmt.Printf("message receive group:%s,
timestamp:%d, offset:%d,body:%s\n",
*mv.GetMessageGroup(),
time.Now().UnixMilli(), mv.GetOffset(), mv.GetBody())
if strings.EqualFold(*mv.GetMessageGroup(),
"fifo001") {
time.Sleep(time.Second * 1)
}
fmt.Printf("message receive group:%s,
timestamp:%d, offset:%d,body:%s,key=%v TO_ACK\n",
*mv.GetMessageGroup(),
time.Now().UnixMilli(), mv.GetOffset(), mv.GetBody(), mv.GetKeys())
err = simpleConsumer.Ack(context.TODO(), mv)
if err != nil {
fmt.Println("simpleConsumer.Ack, err:",
err)
}
}
fmt.Println()
}
}()
```
测试打印log如下:
```
consumer 1 log如下:
selectMessageQueueId: 1
message receive group:fifo001, timestamp:1743471018946, offset:5,body:message_5
message receive group:fifo001, timestamp:1743471019947,
offset:5,body:message_5,key=[5] TO_ACK
selectMessageQueueId: 0
message receive group:fifo001, timestamp:1743471019984, offset:7,body:message_7
message receive group:fifo001, timestamp:1743471020985,
offset:7,body:message_7,key=[7] TO_ACK
consumer2 log如下:
selectMessageQueueId: 0
message receive group:fifo001, timestamp:1743471019768,
offset:6,body:message_6,key=[6]
message receive group:fifo001, timestamp:1743471020793,
offset:6,body:message_6,key=[6] TO_ACK
selectMessageQueueId: 1
message receive group:fifo001, timestamp:1743471020806,
offset:8,body:message_8,key=[8]
message receive group:fifo001, timestamp:1743471021819,
offset:8,body:message_8,key=[8] TO_ACK
```
问题1 :顺序消息顺序性
从文档看,[顺序消息负载均衡文档说明](https://rocketmq.apache.org/zh/docs/featureBehavior/08consumerloadbalance/),“在消费过程中,前面的消息M1、M2被消费者Consumer
A1处理时,只要消费状态没有提交,消费者A2是无法并行消费后续的M3、M4消息的,必须等前面的消息提交消费状态后才能消费后面的消息”。但是从日志看,M3,M4
在M2 未ACK前已经收到。这是由broker端处理由问题,还是我的用法不对?
consumer 2收到消息:timestamp:1743471019768, offset:6
consumer 1 收到消息:timestamp:1743471018946, offset:5,
准备ack: timestamp:1743471019947,
offset:5,body:message_5,key=[5] TO_ACK
从这里看到consumer1 的offset5消息尚未ack,consumer2 已经收到了offset 6 的消息
问题2 :queue 0 没有消息,client 轮询选择了queue0的时,也有消息返回,broker是怎么处理的?
GitHub link: https://github.com/apache/rocketmq/discussions/9306
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]