This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 80962ade Support fifo parallel consume in go pushconsumer (#1122)
80962ade is described below

commit 80962ade844b973f5633feb3e65b289a0ff3593f
Author: guyinyou <[email protected]>
AuthorDate: Wed Nov 5 13:59:55 2025 +0800

    Support fifo parallel consume in go pushconsumer (#1122)
    
    Change-Id: If602f7dc9316d01dd7c8846f73c507ffe46db283
    
    Co-authored-by: guyinyou <[email protected]>
---
 golang/consumer_service.go      | 36 +++++++++++++++++++++++++++++++++---
 golang/push_consumer.go         |  4 ++--
 golang/push_consumer_options.go | 34 ++++++++++++++++++++++------------
 3 files changed, 57 insertions(+), 17 deletions(-)

diff --git a/golang/consumer_service.go b/golang/consumer_service.go
index 847605f9..316749fb 100644
--- a/golang/consumer_service.go
+++ b/golang/consumer_service.go
@@ -107,6 +107,7 @@ type standardConsumeService struct {
 }
 type fifoConsumeService struct {
        baseConsumeService
+       enableFifoConsumeAccelerator bool
 }
 
 func (scs *standardConsumeService) consume(pq ProcessQueue, messageViews 
[]*MessageView) {
@@ -133,7 +134,35 @@ func NewStandardConsumeService(clientId string, 
messageListener MessageListener,
 }
 
 func (fcs *fifoConsumeService) consume(pq ProcessQueue, messageViews 
[]*MessageView) {
-       fcs.consumeIteratively(pq, &messageViews, 0)
+       if !fcs.enableFifoConsumeAccelerator || len(messageViews) <= 1 {
+               fcs.consumeIteratively(pq, &messageViews, 0)
+               return
+       }
+       // Group messages by messageGroup
+       messageViewsGroupByMessageGroup := make(map[string][]*MessageView)
+       messageViewsWithoutMessageGroup := make([]*MessageView, 0)
+       for _, messageView := range messageViews {
+               messageGroup := messageView.GetMessageGroup()
+               if messageGroup != nil && *messageGroup != "" {
+                       messageViewsGroupByMessageGroup[*messageGroup] = 
append(messageViewsGroupByMessageGroup[*messageGroup], messageView)
+               } else {
+                       messageViewsWithoutMessageGroup = 
append(messageViewsWithoutMessageGroup, messageView)
+               }
+       }
+
+       groupNum := len(messageViewsGroupByMessageGroup)
+       if len(messageViewsWithoutMessageGroup) > 0 {
+               groupNum++
+       }
+       sugarBaseLogger.Debugf("FifoConsumeService parallel consume, 
messageViewsNum=%d, groupNum=%d", len(messageViews), groupNum)
+
+       // Consume messages in parallel by group
+       for _, group := range messageViewsGroupByMessageGroup {
+               fcs.consumeIteratively(pq, &group, 0)
+       }
+       if len(messageViewsWithoutMessageGroup) > 0 {
+               fcs.consumeIteratively(pq, &messageViewsWithoutMessageGroup, 0)
+       }
 }
 func (fcs *fifoConsumeService) consumeIteratively(pq ProcessQueue, 
messageViewsPtr *[]*MessageView, ptr int) {
        if messageViewsPtr == nil {
@@ -161,8 +190,9 @@ func (fcs *fifoConsumeService) consumeIteratively(pq 
ProcessQueue, messageViewsP
        })
 }
 
-func NewFiFoConsumeService(clientId string, messageListener MessageListener, 
consumptionExecutor *simpleThreadPool, messageInterceptor MessageInterceptor) 
*fifoConsumeService {
+func NewFiFoConsumeService(clientId string, messageListener MessageListener, 
consumptionExecutor *simpleThreadPool, messageInterceptor MessageInterceptor, 
enableFifoConsumeAccelerator bool) *fifoConsumeService {
        return &fifoConsumeService{
-               *NewBaseConsumeService(clientId, messageListener, 
consumptionExecutor, messageInterceptor),
+               baseConsumeService:            *NewBaseConsumeService(clientId, 
messageListener, consumptionExecutor, messageInterceptor),
+               enableFifoConsumeAccelerator:  enableFifoConsumeAccelerator,
        }
 }
diff --git a/golang/push_consumer.go b/golang/push_consumer.go
index 38788073..df2aa411 100644
--- a/golang/push_consumer.go
+++ b/golang/push_consumer.go
@@ -388,8 +388,8 @@ func (pc *defaultPushConsumer) Start() error {
 
        threadPool := NewSimpleThreadPool("MessageConsumption", 
int(pc.pcOpts.maxCacheMessageCount), int(pc.pcOpts.consumptionThreadCount))
        if pc.pcSettings.isFifo {
-               pc.consumerService = NewFiFoConsumeService(pc.cli.clientID, 
pc.pcOpts.messageListener, threadPool, pc.cli)
-               pc.cli.log.Infof("Create FIFO consume service, 
consumerGroup=%s, clientId=%s", pc.cli.config.ConsumerGroup, pc.cli.clientID)
+               pc.consumerService = NewFiFoConsumeService(pc.cli.clientID, 
pc.pcOpts.messageListener, threadPool, pc.cli, 
pc.pcOpts.enableFifoConsumeAccelerator)
+               pc.cli.log.Infof("Create FIFO consume service, 
consumerGroup=%s, clientId=%s, enableFifoConsumeAccelerator=%t", 
pc.cli.config.ConsumerGroup, pc.cli.clientID, 
pc.pcOpts.enableFifoConsumeAccelerator)
        } else {
                pc.consumerService = NewStandardConsumeService(pc.cli.clientID, 
pc.pcOpts.messageListener, threadPool, pc.cli)
                pc.cli.log.Infof("Create standard consume service, 
consumerGroup=%s, clientId=%s", pc.cli.config.ConsumerGroup, pc.cli.clientID)
diff --git a/golang/push_consumer_options.go b/golang/push_consumer_options.go
index 1fea675c..411fca19 100644
--- a/golang/push_consumer_options.go
+++ b/golang/push_consumer_options.go
@@ -55,21 +55,23 @@ func (l *FuncMessageListener) consume(msg *MessageView) 
ConsumerResult {
 var _ = MessageListener(&FuncMessageListener{})
 
 type pushConsumerOptions struct {
-       subscriptionExpressions    *sync.Map
-       awaitDuration              time.Duration
-       maxCacheMessageCount       int32
-       maxCacheMessageSizeInBytes int64
-       consumptionThreadCount     int32
-       messageListener            MessageListener
-       clientFunc                 NewClientFunc
+       subscriptionExpressions         *sync.Map
+       awaitDuration                   time.Duration
+       maxCacheMessageCount            int32
+       maxCacheMessageSizeInBytes      int64
+       consumptionThreadCount          int32
+       messageListener                 MessageListener
+       clientFunc                      NewClientFunc
+       enableFifoConsumeAccelerator    bool
 }
 
 var defaultPushConsumerOptions = pushConsumerOptions{
-       clientFunc:                 NewClient,
-       awaitDuration:              0,
-       maxCacheMessageCount:       1024,
-       maxCacheMessageSizeInBytes: 64 * 1024 * 1024,
-       consumptionThreadCount:     20,
+       clientFunc:                    NewClient,
+       awaitDuration:                 0,
+       maxCacheMessageCount:          1024,
+       maxCacheMessageSizeInBytes:    64 * 1024 * 1024,
+       consumptionThreadCount:        20,
+       enableFifoConsumeAccelerator:   false,
 }
 
 // A ConsumerOption sets options such as tag, etc.
@@ -136,6 +138,14 @@ func WithPushMessageListener(messageListener 
MessageListener) PushConsumerOption
        })
 }
 
+// WithPushEnableFifoConsumeAccelerator sets enable fifo consume accelerator.
+// If enabled, the consumer will consume messages in parallel by messageGroup,
+func WithPushEnableFifoConsumeAccelerator(enableFifoConsumeAccelerator bool) 
PushConsumerOption {
+       return newFuncPushConsumerOption(func(o *pushConsumerOptions) {
+               o.enableFifoConsumeAccelerator = enableFifoConsumeAccelerator
+       })
+}
+
 var _ = ClientSettings(&pushConsumerSettings{})
 
 type pushConsumerSettings struct {

Reply via email to