This is an automated email from the ASF dual-hosted git repository.

RongtongJin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new f17e6dff [Go] Fix Commit/RollBack always returning nil in transaction 
(#1271)
f17e6dff is described below

commit f17e6dff2938b9c4b5a9514e5acfae2dfa88fd16
Author: guyinyou <[email protected]>
AuthorDate: Fri Jun 12 10:31:25 2026 +0800

    [Go] Fix Commit/RollBack always returning nil in transaction (#1271)
    
    The Commit() and RollBack() methods in transactionImpl silently swallowed
    errors from endTransaction: errors were only logged inside the Range
    callback, and the caller always received nil, making it impossible to
    detect a failed commit or rollback.
    
    What changed:
    - Capture the error from the Range closure and return it to the caller.
    - Stop Range iteration on first error (matching Java's fail-fast behavior).
    - Return an error when messageSendReceiptMap is empty, aligning with the
      Java client's IllegalStateException ("Transactional message has not been
      sent yet").
    
    Affected: golang/transaction.go Commit(), RollBack()
    
    Co-authored-by: guyinyou <[email protected]>
---
 golang/transaction.go | 24 ++++++++++++++++++++----
 1 file changed, 20 insertions(+), 4 deletions(-)

diff --git a/golang/transaction.go b/golang/transaction.go
index 89839616..b88205b1 100644
--- a/golang/transaction.go
+++ b/golang/transaction.go
@@ -61,31 +61,47 @@ var NewTransactionImpl = func(producerImpl Producer) 
*transactionImpl {
 }
 
 func (t *transactionImpl) Commit() error {
+       isEmpty := true
+       var commitErr error
        t.messageSendReceiptMap.Range(func(_, value interface{}) bool {
+               isEmpty = false
                pubMessage := value.([]interface{})[0].(*PublishingMessage)
                sendReceipt := value.([]interface{})[1].(*SendReceipt)
                err := 
t.producerImpl.(*defaultProducer).endTransaction(context.TODO(), 
sendReceipt.Endpoints,
                        pubMessage.msg.GetMessageCommon(), 
sendReceipt.MessageID, sendReceipt.TransactionId, COMMIT)
                if err != nil {
-                       sugarBaseLogger.Errorf("transaction message commit 
failed, err=%w", err)
+                       commitErr = fmt.Errorf("transaction message commit 
failed: %w", err)
+                       sugarBaseLogger.Errorf("%v", commitErr)
+                       return false
                }
                return true
        })
-       return nil
+       if isEmpty {
+               return fmt.Errorf("transactional message has not been sent yet")
+       }
+       return commitErr
 }
 
 func (t *transactionImpl) RollBack() error {
+       isEmpty := true
+       var rollbackErr error
        t.messageSendReceiptMap.Range(func(_, value interface{}) bool {
+               isEmpty = false
                pubMessage := value.([]interface{})[0].(*PublishingMessage)
                sendReceipt := value.([]interface{})[1].(*SendReceipt)
                err := 
t.producerImpl.(*defaultProducer).endTransaction(context.TODO(), 
sendReceipt.Endpoints,
                        pubMessage.msg.GetMessageCommon(), 
sendReceipt.MessageID, sendReceipt.TransactionId, ROLLBACK)
                if err != nil {
-                       sugarBaseLogger.Errorf("transaction message rollback 
failed, err=%w", err)
+                       rollbackErr = fmt.Errorf("transaction message rollback 
failed: %w", err)
+                       sugarBaseLogger.Errorf("%v", rollbackErr)
+                       return false
                }
                return true
        })
-       return nil
+       if isEmpty {
+               return fmt.Errorf("transactional message has not been sent yet")
+       }
+       return rollbackErr
 }
 
 func (t *transactionImpl) tryAddMessage(message *Message, namespace string) 
(*PublishingMessage, error) {

Reply via email to