This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 80962ade Support fifo parallel consume in go pushconsumer (#1122)
80962ade is described below
commit 80962ade844b973f5633feb3e65b289a0ff3593f
Author: guyinyou <[email protected]>
AuthorDate: Wed Nov 5 13:59:55 2025 +0800
Support fifo parallel consume in go pushconsumer (#1122)
Change-Id: If602f7dc9316d01dd7c8846f73c507ffe46db283
Co-authored-by: guyinyou <[email protected]>
---
golang/consumer_service.go | 36 +++++++++++++++++++++++++++++++++---
golang/push_consumer.go | 4 ++--
golang/push_consumer_options.go | 34 ++++++++++++++++++++++------------
3 files changed, 57 insertions(+), 17 deletions(-)
diff --git a/golang/consumer_service.go b/golang/consumer_service.go
index 847605f9..316749fb 100644
--- a/golang/consumer_service.go
+++ b/golang/consumer_service.go
@@ -107,6 +107,7 @@ type standardConsumeService struct {
}
type fifoConsumeService struct {
baseConsumeService
+ enableFifoConsumeAccelerator bool
}
func (scs *standardConsumeService) consume(pq ProcessQueue, messageViews
[]*MessageView) {
@@ -133,7 +134,35 @@ func NewStandardConsumeService(clientId string,
messageListener MessageListener,
}
func (fcs *fifoConsumeService) consume(pq ProcessQueue, messageViews
[]*MessageView) {
- fcs.consumeIteratively(pq, &messageViews, 0)
+ if !fcs.enableFifoConsumeAccelerator || len(messageViews) <= 1 {
+ fcs.consumeIteratively(pq, &messageViews, 0)
+ return
+ }
+ // Group messages by messageGroup
+ messageViewsGroupByMessageGroup := make(map[string][]*MessageView)
+ messageViewsWithoutMessageGroup := make([]*MessageView, 0)
+ for _, messageView := range messageViews {
+ messageGroup := messageView.GetMessageGroup()
+ if messageGroup != nil && *messageGroup != "" {
+ messageViewsGroupByMessageGroup[*messageGroup] =
append(messageViewsGroupByMessageGroup[*messageGroup], messageView)
+ } else {
+ messageViewsWithoutMessageGroup =
append(messageViewsWithoutMessageGroup, messageView)
+ }
+ }
+
+ groupNum := len(messageViewsGroupByMessageGroup)
+ if len(messageViewsWithoutMessageGroup) > 0 {
+ groupNum++
+ }
+ sugarBaseLogger.Debugf("FifoConsumeService parallel consume,
messageViewsNum=%d, groupNum=%d", len(messageViews), groupNum)
+
+ // Consume messages in parallel by group
+ for _, group := range messageViewsGroupByMessageGroup {
+ fcs.consumeIteratively(pq, &group, 0)
+ }
+ if len(messageViewsWithoutMessageGroup) > 0 {
+ fcs.consumeIteratively(pq, &messageViewsWithoutMessageGroup, 0)
+ }
}
func (fcs *fifoConsumeService) consumeIteratively(pq ProcessQueue,
messageViewsPtr *[]*MessageView, ptr int) {
if messageViewsPtr == nil {
@@ -161,8 +190,9 @@ func (fcs *fifoConsumeService) consumeIteratively(pq
ProcessQueue, messageViewsP
})
}
-func NewFiFoConsumeService(clientId string, messageListener MessageListener,
consumptionExecutor *simpleThreadPool, messageInterceptor MessageInterceptor)
*fifoConsumeService {
+func NewFiFoConsumeService(clientId string, messageListener MessageListener,
consumptionExecutor *simpleThreadPool, messageInterceptor MessageInterceptor,
enableFifoConsumeAccelerator bool) *fifoConsumeService {
return &fifoConsumeService{
- *NewBaseConsumeService(clientId, messageListener,
consumptionExecutor, messageInterceptor),
+ baseConsumeService: *NewBaseConsumeService(clientId,
messageListener, consumptionExecutor, messageInterceptor),
+ enableFifoConsumeAccelerator: enableFifoConsumeAccelerator,
}
}
diff --git a/golang/push_consumer.go b/golang/push_consumer.go
index 38788073..df2aa411 100644
--- a/golang/push_consumer.go
+++ b/golang/push_consumer.go
@@ -388,8 +388,8 @@ func (pc *defaultPushConsumer) Start() error {
threadPool := NewSimpleThreadPool("MessageConsumption",
int(pc.pcOpts.maxCacheMessageCount), int(pc.pcOpts.consumptionThreadCount))
if pc.pcSettings.isFifo {
- pc.consumerService = NewFiFoConsumeService(pc.cli.clientID,
pc.pcOpts.messageListener, threadPool, pc.cli)
- pc.cli.log.Infof("Create FIFO consume service,
consumerGroup=%s, clientId=%s", pc.cli.config.ConsumerGroup, pc.cli.clientID)
+ pc.consumerService = NewFiFoConsumeService(pc.cli.clientID,
pc.pcOpts.messageListener, threadPool, pc.cli,
pc.pcOpts.enableFifoConsumeAccelerator)
+ pc.cli.log.Infof("Create FIFO consume service,
consumerGroup=%s, clientId=%s, enableFifoConsumeAccelerator=%t",
pc.cli.config.ConsumerGroup, pc.cli.clientID,
pc.pcOpts.enableFifoConsumeAccelerator)
} else {
pc.consumerService = NewStandardConsumeService(pc.cli.clientID,
pc.pcOpts.messageListener, threadPool, pc.cli)
pc.cli.log.Infof("Create standard consume service,
consumerGroup=%s, clientId=%s", pc.cli.config.ConsumerGroup, pc.cli.clientID)
diff --git a/golang/push_consumer_options.go b/golang/push_consumer_options.go
index 1fea675c..411fca19 100644
--- a/golang/push_consumer_options.go
+++ b/golang/push_consumer_options.go
@@ -55,21 +55,23 @@ func (l *FuncMessageListener) consume(msg *MessageView)
ConsumerResult {
var _ = MessageListener(&FuncMessageListener{})
type pushConsumerOptions struct {
- subscriptionExpressions *sync.Map
- awaitDuration time.Duration
- maxCacheMessageCount int32
- maxCacheMessageSizeInBytes int64
- consumptionThreadCount int32
- messageListener MessageListener
- clientFunc NewClientFunc
+ subscriptionExpressions *sync.Map
+ awaitDuration time.Duration
+ maxCacheMessageCount int32
+ maxCacheMessageSizeInBytes int64
+ consumptionThreadCount int32
+ messageListener MessageListener
+ clientFunc NewClientFunc
+ enableFifoConsumeAccelerator bool
}
var defaultPushConsumerOptions = pushConsumerOptions{
- clientFunc: NewClient,
- awaitDuration: 0,
- maxCacheMessageCount: 1024,
- maxCacheMessageSizeInBytes: 64 * 1024 * 1024,
- consumptionThreadCount: 20,
+ clientFunc: NewClient,
+ awaitDuration: 0,
+ maxCacheMessageCount: 1024,
+ maxCacheMessageSizeInBytes: 64 * 1024 * 1024,
+ consumptionThreadCount: 20,
+ enableFifoConsumeAccelerator: false,
}
// A ConsumerOption sets options such as tag, etc.
@@ -136,6 +138,14 @@ func WithPushMessageListener(messageListener
MessageListener) PushConsumerOption
})
}
+// WithPushEnableFifoConsumeAccelerator sets enable fifo consume accelerator.
+// If enabled, the consumer will consume messages in parallel by messageGroup,
+func WithPushEnableFifoConsumeAccelerator(enableFifoConsumeAccelerator bool)
PushConsumerOption {
+ return newFuncPushConsumerOption(func(o *pushConsumerOptions) {
+ o.enableFifoConsumeAccelerator = enableFifoConsumeAccelerator
+ })
+}
+
var _ = ClientSettings(&pushConsumerSettings{})
type pushConsumerSettings struct {