This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 6ab93d5a fix: return error when the client transaction coordinator is
nil to p… (#1444)
6ab93d5a is described below
commit 6ab93d5a1d8104b92a419f0abb14a3c0a9f684c4
Author: Thomas Bousquet <[email protected]>
AuthorDate: Tue Dec 2 19:49:45 2025 -0800
fix: return error when the client transaction coordinator is nil to p…
(#1444)
* fix: return error when the client transaction coordinator is nil to
prevent panic
* test: add testcase to ensure error is actually returned
---
pulsar/client_impl.go | 8 ++++++++
pulsar/transaction_test.go | 36 +++++++++++++++++++++++++-----------
2 files changed, 33 insertions(+), 11 deletions(-)
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index 809e4d72..d940367f 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -23,6 +23,8 @@ import (
"sync"
"time"
+ "errors"
+
"github.com/apache/pulsar-client-go/pulsar/auth"
"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/log"
@@ -39,6 +41,8 @@ const (
minConnMaxIdleTime = 60 * time.Second
)
+var ErrClientTransactionsNotEnabled = errors.New("transactions are not enabled
with the client")
+
type client struct {
cnxPool internal.ConnectionPool
rpcClient internal.RPCClient
@@ -196,6 +200,10 @@ func newClient(options ClientOptions) (Client, error) {
}
func (c *client) NewTransaction(timeout time.Duration) (Transaction, error) {
+ if c.tcClient == nil {
+ return nil, ErrClientTransactionsNotEnabled
+ }
+
id, err := c.tcClient.newTransaction(timeout)
if err != nil {
return nil, err
diff --git a/pulsar/transaction_test.go b/pulsar/transaction_test.go
index 75b36ea8..9921ea03 100644
--- a/pulsar/transaction_test.go
+++ b/pulsar/transaction_test.go
@@ -35,7 +35,7 @@ func TestTxn_TCClient(t *testing.T) {
//1. Prepare: create PulsarClient and init transaction coordinator
client.
topic := newTopicName()
sub := "my-sub"
- tc, client := createTcClient(t)
+ tc, client := createClientWithTC(t)
//2. Prepare: create Topic and Subscription.
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
@@ -100,7 +100,7 @@ func TestTxn_TCClient(t *testing.T) {
// Test abort and commit txn
func TestTxn_ImplCommitOrAbort(t *testing.T) {
- tc, _ := createTcClient(t)
+ tc, _ := createClientWithTC(t)
//1. Open a transaction and then commit it.
//The operations of committing txn1 should success at the first time
and fail at the second time.
txn1 := createTxn(tc, t)
@@ -133,7 +133,7 @@ func TestTxn_ImplCommitOrAbort(t *testing.T) {
// Test the internal API including the registerSendOrAckOp and endSendOrAckOp.
func TestTxn_RegisterOpAndEndOp(t *testing.T) {
- tc, _ := createTcClient(t)
+ tc, _ := createClientWithTC(t)
//1. Register 4 operation but only end 3 operations, the transaction
can not be committed or aborted.
res := registerOpAndEndOp(t, tc, 4, 3, nil, true)
assert.Equal(t, res.(*Error).Result(), TimeoutError)
@@ -158,7 +158,7 @@ func TestTxn_RegisterTopic(t *testing.T) {
//1. Prepare: create PulsarClient and init transaction coordinator
client.
topic := newTopicName()
sub := "my-sub"
- tc, client := createTcClient(t)
+ tc, client := createClientWithTC(t)
//2. Prepare: create Topic and Subscription.
_, err := client.Subscribe(ConsumerOptions{
Topic: topic,
@@ -205,8 +205,22 @@ func createTxn(tc *transactionCoordinatorClient, t
*testing.T) *transaction {
return newTransaction(*id, tc, txnTimeout)
}
-// createTcClient Create a transaction coordinator client to send request
-func createTcClient(t *testing.T) (*transactionCoordinatorClient, *client) {
+func TestTxn_NoTransactionCoordinator(t *testing.T) {
+ clientWithNoTC, err := NewClient(ClientOptions{
+ URL: webServiceURLTLS,
+ TLSTrustCertsFilePath: caCertsPath,
+ Authentication: NewAuthenticationTLS(tlsClientCertPath,
tlsClientKeyPath),
+ EnableTransaction: false,
+ })
+ require.Nil(t, err, "Failed to create client.")
+
+ tx, err := clientWithNoTC.NewTransaction(txnTimeout)
+ require.Nil(t, tx, "Did not fail creating a new transaction,
transaction should be nil")
+ require.ErrorIs(t, err, ErrClientTransactionsNotEnabled)
+}
+
+// createClientWithTC creates a new client with a transaction coordinator
client to send request
+func createClientWithTC(t *testing.T) (*transactionCoordinatorClient, *client)
{
c, err := NewClient(ClientOptions{
URL: webServiceURLTLS,
TLSTrustCertsFilePath: caCertsPath,
@@ -236,7 +250,7 @@ func TestTxn_ConsumeAndProduce(t *testing.T) {
// Step 1: Prepare - Create PulsarClient and initialize the transaction
coordinator client.
topic := newTopicName()
sub := "my-sub"
- _, client := createTcClient(t)
+ _, client := createClientWithTC(t)
// Step 2: Prepare - Create Topic and Subscription.
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
@@ -330,7 +344,7 @@ func TestTxn_AckAndSend(t *testing.T) {
sourceTopic := newTopicName()
sinkTopic := newTopicName()
sub := "my-sub"
- _, client := createTcClient(t)
+ _, client := createClientWithTC(t)
// Prepare: Create source and sink topics and subscriptions.
sourceConsumer, _ := client.Subscribe(ConsumerOptions{
@@ -399,7 +413,7 @@ func TestTxn_TransactionAbort(t *testing.T) {
// Prepare: Create PulsarClient and initialize the transaction
coordinator client.
topic := newTopicName()
sub := "my-sub"
- _, client := createTcClient(t)
+ _, client := createClientWithTC(t)
// Prepare: Create Topic and Subscription.
consumer, _ := client.Subscribe(ConsumerOptions{
@@ -456,7 +470,7 @@ func TestTxn_AckChunkMessage(t *testing.T) {
sub := "my-sub"
// Prepare: Create PulsarClient and initialize the transaction
coordinator client.
- _, client := createTcClient(t)
+ _, client := createClientWithTC(t)
// Create transaction and register the send operation.
txn, err := client.NewTransaction(txnTimeout)
@@ -548,7 +562,7 @@ func TestTxn_ConnReconnect(t *testing.T) {
defer cancel()
topic := newTopicName()
- _, cli := createTcClient(t)
+ _, cli := createClientWithTC(t)
txn, err := cli.NewTransaction(5 * time.Minute)
assert.NoError(t, err)