GitHub user loringgit added a comment to the discussion: FIFO topic 消费端非严格有序
1.topic创建的时候有指定是FIFO类型,创建命令:
```
./bin/mqadmin updateTopic -n nameserver(ip:port) -b mybroker(ip:port) -t
test_fifo_topic -p 6 -r 2 -w 2 -a +message.type=FIFO
```
我用的是
github.com/apache/rocketmq-clients/golang/v5,NewSimpleConsumer默认使用的是v2.ClientType_SIMPLE_CONSUMER,有试过把这个默认值修改为
ClientType_PUSH_CONSUMER,测试结果没变化,你的意思是需要参考Java的PushConsumer自己实现OrderedMessage的Consumer吗?
2.日志中select MessageQueueId 是在ReceiveMessage方法增加的一行日志,代码见下面的注释
```
func (sc *defaultSimpleConsumer) Receive(ctx context.Context, maxMessageNum
int32, invisibleDuration time.Duration) ([]*MessageView, error) {
if !sc.isOn() {
return nil, fmt.Errorf("simple consumer is not running")
}
if maxMessageNum <= 0 {
return nil, fmt.Errorf("maxMessageNum must be greater than 0")
}
sc.subscriptionExpressionsLock.RLock()
topics := make([]string, 0, len(sc.subscriptionExpressions))
for k := range sc.subscriptionExpressions {
topics = append(topics, k)
}
sc.subscriptionExpressionsLock.RUnlock()
// All topic is subscribed.
if len(topics) == 0 {
return nil, fmt.Errorf("there is no topic to receive message")
}
next := atomic.AddInt32(&sc.topicIndex, 1)
idx := utils.Mod(next+1, len(topics))
topic := topics[idx]
sc.subscriptionExpressionsLock.RLock()
filterExpression, ok := sc.subscriptionExpressions[topic]
sc.subscriptionExpressionsLock.RUnlock()
if !ok {
return nil, fmt.Errorf("no found filterExpression about topic:
%s", topic)
}
subLoadBalancer, err := sc.getSubscriptionTopicRouteResult(ctx, topic)
if err != nil {
return nil, err
}
selectMessageQueue, err := subLoadBalancer.TakeMessageQueue()
if err != nil {
return nil, err
}
fmt.Printf("selectMessageQueueId: %v\n", selectMessageQueue.GetId())
// 打印QueueId
request := sc.wrapReceiveMessageRequest(int(maxMessageNum),
selectMessageQueue, filterExpression, invisibleDuration)
timeout := sc.scOpts.awaitDuration + sc.cli.opts.timeout
return sc.receiveMessage(ctx, request, selectMessageQueue, timeout)
}
```
@francisoliverlee
GitHub link:
https://github.com/apache/rocketmq/discussions/9306#discussioncomment-12696340
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]