This is an automated email from the ASF dual-hosted git repository.
RongtongJin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new f17e6dff [Go] Fix Commit/RollBack always returning nil in transaction
(#1271)
f17e6dff is described below
commit f17e6dff2938b9c4b5a9514e5acfae2dfa88fd16
Author: guyinyou <[email protected]>
AuthorDate: Fri Jun 12 10:31:25 2026 +0800
[Go] Fix Commit/RollBack always returning nil in transaction (#1271)
The Commit() and RollBack() methods in transactionImpl silently swallowed
errors from endTransaction: errors were only logged inside the Range
callback, and the caller always received nil, making it impossible to
detect a failed commit or rollback.
What changed:
- Capture the error from the Range closure and return it to the caller.
- Stop Range iteration on first error (matching Java's fail-fast behavior).
- Return an error when messageSendReceiptMap is empty, aligning with the
Java client's IllegalStateException ("Transactional message has not been
sent yet").
Affected: golang/transaction.go Commit(), RollBack()
Co-authored-by: guyinyou <[email protected]>
---
golang/transaction.go | 24 ++++++++++++++++++++----
1 file changed, 20 insertions(+), 4 deletions(-)
diff --git a/golang/transaction.go b/golang/transaction.go
index 89839616..b88205b1 100644
--- a/golang/transaction.go
+++ b/golang/transaction.go
@@ -61,31 +61,47 @@ var NewTransactionImpl = func(producerImpl Producer)
*transactionImpl {
}
func (t *transactionImpl) Commit() error {
+ isEmpty := true
+ var commitErr error
t.messageSendReceiptMap.Range(func(_, value interface{}) bool {
+ isEmpty = false
pubMessage := value.([]interface{})[0].(*PublishingMessage)
sendReceipt := value.([]interface{})[1].(*SendReceipt)
err :=
t.producerImpl.(*defaultProducer).endTransaction(context.TODO(),
sendReceipt.Endpoints,
pubMessage.msg.GetMessageCommon(),
sendReceipt.MessageID, sendReceipt.TransactionId, COMMIT)
if err != nil {
- sugarBaseLogger.Errorf("transaction message commit
failed, err=%w", err)
+ commitErr = fmt.Errorf("transaction message commit
failed: %w", err)
+ sugarBaseLogger.Errorf("%v", commitErr)
+ return false
}
return true
})
- return nil
+ if isEmpty {
+ return fmt.Errorf("transactional message has not been sent yet")
+ }
+ return commitErr
}
func (t *transactionImpl) RollBack() error {
+ isEmpty := true
+ var rollbackErr error
t.messageSendReceiptMap.Range(func(_, value interface{}) bool {
+ isEmpty = false
pubMessage := value.([]interface{})[0].(*PublishingMessage)
sendReceipt := value.([]interface{})[1].(*SendReceipt)
err :=
t.producerImpl.(*defaultProducer).endTransaction(context.TODO(),
sendReceipt.Endpoints,
pubMessage.msg.GetMessageCommon(),
sendReceipt.MessageID, sendReceipt.TransactionId, ROLLBACK)
if err != nil {
- sugarBaseLogger.Errorf("transaction message rollback
failed, err=%w", err)
+ rollbackErr = fmt.Errorf("transaction message rollback
failed: %w", err)
+ sugarBaseLogger.Errorf("%v", rollbackErr)
+ return false
}
return true
})
- return nil
+ if isEmpty {
+ return fmt.Errorf("transactional message has not been sent yet")
+ }
+ return rollbackErr
}
func (t *transactionImpl) tryAddMessage(message *Message, namespace string)
(*PublishingMessage, error) {