This is an automated email from the ASF dual-hosted git repository.

RongtongJin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 7448aa1c [Go] Fix data race in receiveMessage for push and simple 
consumers (#1275)
7448aa1c is described below

commit 7448aa1cb7ab6cc3f19a65a2942ac28cb871113b
Author: guyinyou <[email protected]>
AuthorDate: Mon Jun 15 11:02:03 2026 +0800

    [Go] Fix data race in receiveMessage for push and simple consumers (#1275)
    
    The err and resps variables were shared between the main goroutine
    and a spawned goroutine without synchronization, causing a data race
    detectable by Go's race detector. The goroutine wrote to both
    variables while the select handler read them concurrently.
    
    Move resps and err into the goroutine as local variables, and pass
    results back through a typed channel (receiveResult struct) instead.
    This also fixes push_consumer.go not signaling the done channel on
    non-EOF errors, which previously caused it to wait for context
    timeout instead of returning the actual error immediately.
    
    Affected:
    - golang/push_consumer.go: receiveMessage()
    - golang/simple_consumer.go: receiveMessage()
    
    Co-authored-by: guyinyou <[email protected]>
---
 golang/push_consumer.go   | 31 +++++++++++++++++--------------
 golang/simple_consumer.go | 33 +++++++++++++++++----------------
 2 files changed, 34 insertions(+), 30 deletions(-)

diff --git a/golang/push_consumer.go b/golang/push_consumer.go
index 84dd548a..111722c9 100644
--- a/golang/push_consumer.go
+++ b/golang/push_consumer.go
@@ -206,7 +206,6 @@ func (pc *defaultPushConsumer) GetGroupName() string {
 }
 
 func (pc *defaultPushConsumer) receiveMessage(ctx context.Context, request 
*v2.ReceiveMessageRequest, messageQueue *v2.MessageQueue, timeout 
time.Duration) ([]*MessageView, error) {
-       var err error
        ctx = pc.cli.Sign(ctx)
        ctx, cancel := context.WithTimeout(ctx, timeout)
        defer cancel()
@@ -215,34 +214,38 @@ func (pc *defaultPushConsumer) receiveMessage(ctx 
context.Context, request *v2.R
        if err != nil {
                return nil, err
        }
-       done := make(chan bool, 1)
+       type receiveResult struct {
+               responses []*v2.ReceiveMessageResponse
+               err       error
+       }
+       done := make(chan receiveResult, 1)
 
-       resps := make([]*v2.ReceiveMessageResponse, 0)
        go func() {
+               var resps []*v2.ReceiveMessageResponse
+               var recvErr error
                for {
                        var resp *v2.ReceiveMessageResponse
-                       resp, err = receiveMessageClient.Recv()
-                       if err == io.EOF {
-                               done <- true
-                               defer close(done)
+                       resp, recvErr = receiveMessageClient.Recv()
+                       if recvErr == io.EOF {
+                               done <- receiveResult{responses: resps}
                                break
                        }
-                       if err != nil {
-                               pc.cli.log.Errorf("pushConsumer recv msg 
err=%v, requestId=%s", err, utils.GetRequestID(ctx))
+                       if recvErr != nil {
+                               pc.cli.log.Errorf("pushConsumer recv msg 
err=%v, requestId=%s", recvErr, utils.GetRequestID(ctx))
+                               done <- receiveResult{err: recvErr}
                                break
                        }
                        sugarBaseLogger.Debugf("receiveMessage response: %v", 
resp)
                        resps = append(resps, resp)
                }
-               cancel()
        }()
        select {
        case <-ctx.Done():
                // timeout
                return nil, fmt.Errorf("[error] CODE=DEADLINE_EXCEEDED")
-       case <-done:
-               if err != nil && err != io.EOF {
-                       return nil, err
+       case result := <-done:
+               if result.err != nil {
+                       return nil, result.err
                }
                messageViewList := make([]*MessageView, 0)
                status := &v2.Status{
@@ -251,7 +254,7 @@ func (pc *defaultPushConsumer) receiveMessage(ctx 
context.Context, request *v2.R
                }
                var deliveryTimestamp *timestamppb.Timestamp
                messageList := make([]*v2.Message, 0)
-               for _, resp := range resps {
+               for _, resp := range result.responses {
                        switch r := resp.GetContent().(type) {
                        case *v2.ReceiveMessageResponse_Status:
                                status = r.Status
diff --git a/golang/simple_consumer.go b/golang/simple_consumer.go
index a159afea..4818e05c 100644
--- a/golang/simple_consumer.go
+++ b/golang/simple_consumer.go
@@ -210,7 +210,6 @@ func (sc *defaultSimpleConsumer) GetGroupName() string {
 }
 
 func (sc *defaultSimpleConsumer) receiveMessage(ctx context.Context, request 
*v2.ReceiveMessageRequest, messageQueue *v2.MessageQueue, timeout 
time.Duration) ([]*MessageView, error) {
-       var err error
        ctx = sc.cli.Sign(ctx)
        ctx, cancel := context.WithTimeout(ctx, timeout)
        defer cancel()
@@ -219,36 +218,38 @@ func (sc *defaultSimpleConsumer) receiveMessage(ctx 
context.Context, request *v2
        if err != nil {
                return nil, err
        }
-       done := make(chan bool, 1)
+       type receiveResult struct {
+               responses []*v2.ReceiveMessageResponse
+               err       error
+       }
+       done := make(chan receiveResult, 1)
 
-       resps := make([]*v2.ReceiveMessageResponse, 0)
        go func() {
+               var resps []*v2.ReceiveMessageResponse
+               var recvErr error
                for {
                        var resp *v2.ReceiveMessageResponse
-                       resp, err = receiveMessageClient.Recv()
-                       if err == io.EOF {
-                               done <- true
-                               defer close(done)
+                       resp, recvErr = receiveMessageClient.Recv()
+                       if recvErr == io.EOF {
+                               done <- receiveResult{responses: resps}
                                break
                        }
-                       if err != nil {
-                               sc.cli.log.Errorf("simpleConsumer recv msg 
err=%v, requestId=%s", err, utils.GetRequestID(ctx))
-                               done <- true
-                               defer close(done)
+                       if recvErr != nil {
+                               sc.cli.log.Errorf("simpleConsumer recv msg 
err=%v, requestId=%s", recvErr, utils.GetRequestID(ctx))
+                               done <- receiveResult{err: recvErr}
                                break
                        }
                        sugarBaseLogger.Debugf("receiveMessage response: %v", 
resp)
                        resps = append(resps, resp)
                }
-               cancel()
        }()
        select {
        case <-ctx.Done():
                // timeout
                return nil, fmt.Errorf("[error] CODE=DEADLINE_EXCEEDED")
-       case <-done:
-               if err != nil && err != io.EOF {
-                       return nil, err
+       case result := <-done:
+               if result.err != nil {
+                       return nil, result.err
                }
                messageViewList := make([]*MessageView, 0)
                status := &v2.Status{
@@ -257,7 +258,7 @@ func (sc *defaultSimpleConsumer) receiveMessage(ctx 
context.Context, request *v2
                }
                var deliveryTimestamp *timestamppb.Timestamp
                messageList := make([]*v2.Message, 0)
-               for _, resp := range resps {
+               for _, resp := range result.responses {
                        switch r := resp.GetContent().(type) {
                        case *v2.ReceiveMessageResponse_Status:
                                status = r.Status

Reply via email to