This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.18.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit 81f22893c3efc357a7812f7384f62579f8c7f5ac Author: Thomas Bousquet <[email protected]> AuthorDate: Tue Dec 2 19:49:45 2025 -0800 fix: return error when the client transaction coordinator is nil to p… (#1444) * fix: return error when the client transaction coordinator is nil to prevent panic * test: add testcase to ensure error is actually returned (cherry picked from commit 6ab93d5a1d8104b92a419f0abb14a3c0a9f684c4) --- pulsar/client_impl.go | 8 ++++++++ pulsar/transaction_test.go | 36 +++++++++++++++++++++++++----------- 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go index 809e4d72..d940367f 100644 --- a/pulsar/client_impl.go +++ b/pulsar/client_impl.go @@ -23,6 +23,8 @@ import ( "sync" "time" + "errors" + "github.com/apache/pulsar-client-go/pulsar/auth" "github.com/apache/pulsar-client-go/pulsar/internal" "github.com/apache/pulsar-client-go/pulsar/log" @@ -39,6 +41,8 @@ const ( minConnMaxIdleTime = 60 * time.Second ) +var ErrClientTransactionsNotEnabled = errors.New("transactions are not enabled with the client") + type client struct { cnxPool internal.ConnectionPool rpcClient internal.RPCClient @@ -196,6 +200,10 @@ func newClient(options ClientOptions) (Client, error) { } func (c *client) NewTransaction(timeout time.Duration) (Transaction, error) { + if c.tcClient == nil { + return nil, ErrClientTransactionsNotEnabled + } + id, err := c.tcClient.newTransaction(timeout) if err != nil { return nil, err diff --git a/pulsar/transaction_test.go b/pulsar/transaction_test.go index 75b36ea8..9921ea03 100644 --- a/pulsar/transaction_test.go +++ b/pulsar/transaction_test.go @@ -35,7 +35,7 @@ func TestTxn_TCClient(t *testing.T) { //1. Prepare: create PulsarClient and init transaction coordinator client. topic := newTopicName() sub := "my-sub" - tc, client := createTcClient(t) + tc, client := createClientWithTC(t) //2. Prepare: create Topic and Subscription. consumer, err := client.Subscribe(ConsumerOptions{ Topic: topic, @@ -100,7 +100,7 @@ func TestTxn_TCClient(t *testing.T) { // Test abort and commit txn func TestTxn_ImplCommitOrAbort(t *testing.T) { - tc, _ := createTcClient(t) + tc, _ := createClientWithTC(t) //1. Open a transaction and then commit it. //The operations of committing txn1 should success at the first time and fail at the second time. txn1 := createTxn(tc, t) @@ -133,7 +133,7 @@ func TestTxn_ImplCommitOrAbort(t *testing.T) { // Test the internal API including the registerSendOrAckOp and endSendOrAckOp. func TestTxn_RegisterOpAndEndOp(t *testing.T) { - tc, _ := createTcClient(t) + tc, _ := createClientWithTC(t) //1. Register 4 operation but only end 3 operations, the transaction can not be committed or aborted. res := registerOpAndEndOp(t, tc, 4, 3, nil, true) assert.Equal(t, res.(*Error).Result(), TimeoutError) @@ -158,7 +158,7 @@ func TestTxn_RegisterTopic(t *testing.T) { //1. Prepare: create PulsarClient and init transaction coordinator client. topic := newTopicName() sub := "my-sub" - tc, client := createTcClient(t) + tc, client := createClientWithTC(t) //2. Prepare: create Topic and Subscription. _, err := client.Subscribe(ConsumerOptions{ Topic: topic, @@ -205,8 +205,22 @@ func createTxn(tc *transactionCoordinatorClient, t *testing.T) *transaction { return newTransaction(*id, tc, txnTimeout) } -// createTcClient Create a transaction coordinator client to send request -func createTcClient(t *testing.T) (*transactionCoordinatorClient, *client) { +func TestTxn_NoTransactionCoordinator(t *testing.T) { + clientWithNoTC, err := NewClient(ClientOptions{ + URL: webServiceURLTLS, + TLSTrustCertsFilePath: caCertsPath, + Authentication: NewAuthenticationTLS(tlsClientCertPath, tlsClientKeyPath), + EnableTransaction: false, + }) + require.Nil(t, err, "Failed to create client.") + + tx, err := clientWithNoTC.NewTransaction(txnTimeout) + require.Nil(t, tx, "Did not fail creating a new transaction, transaction should be nil") + require.ErrorIs(t, err, ErrClientTransactionsNotEnabled) +} + +// createClientWithTC creates a new client with a transaction coordinator client to send request +func createClientWithTC(t *testing.T) (*transactionCoordinatorClient, *client) { c, err := NewClient(ClientOptions{ URL: webServiceURLTLS, TLSTrustCertsFilePath: caCertsPath, @@ -236,7 +250,7 @@ func TestTxn_ConsumeAndProduce(t *testing.T) { // Step 1: Prepare - Create PulsarClient and initialize the transaction coordinator client. topic := newTopicName() sub := "my-sub" - _, client := createTcClient(t) + _, client := createClientWithTC(t) // Step 2: Prepare - Create Topic and Subscription. consumer, err := client.Subscribe(ConsumerOptions{ Topic: topic, @@ -330,7 +344,7 @@ func TestTxn_AckAndSend(t *testing.T) { sourceTopic := newTopicName() sinkTopic := newTopicName() sub := "my-sub" - _, client := createTcClient(t) + _, client := createClientWithTC(t) // Prepare: Create source and sink topics and subscriptions. sourceConsumer, _ := client.Subscribe(ConsumerOptions{ @@ -399,7 +413,7 @@ func TestTxn_TransactionAbort(t *testing.T) { // Prepare: Create PulsarClient and initialize the transaction coordinator client. topic := newTopicName() sub := "my-sub" - _, client := createTcClient(t) + _, client := createClientWithTC(t) // Prepare: Create Topic and Subscription. consumer, _ := client.Subscribe(ConsumerOptions{ @@ -456,7 +470,7 @@ func TestTxn_AckChunkMessage(t *testing.T) { sub := "my-sub" // Prepare: Create PulsarClient and initialize the transaction coordinator client. - _, client := createTcClient(t) + _, client := createClientWithTC(t) // Create transaction and register the send operation. txn, err := client.NewTransaction(txnTimeout) @@ -548,7 +562,7 @@ func TestTxn_ConnReconnect(t *testing.T) { defer cancel() topic := newTopicName() - _, cli := createTcClient(t) + _, cli := createClientWithTC(t) txn, err := cli.NewTransaction(5 * time.Minute) assert.NoError(t, err)
