This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch handle_txn_of_chunk_message_reponse
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to 
refs/heads/handle_txn_of_chunk_message_reponse by this push:
     new 486e885  add test
486e885 is described below

commit 486e885e4c25505f1e4e9403dac2dd0207b7c39a
Author: xiangying <[email protected]>
AuthorDate: Sat Jul 22 20:15:11 2023 +0800

    add test
---
 pulsar/consumer_partition.go |   3 +-
 pulsar/transaction_test.go   | 108 ++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 107 insertions(+), 4 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 74e1a38..a99cec3 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -428,9 +428,8 @@ func (pc *partitionConsumer) ackIDCommon(msgID MessageID, 
withResponse bool, txn
        if cmid, ok := msgID.(*chunkMessageID); ok {
                if txn == nil {
                        return pc.unAckChunksTracker.ack(cmid)
-               } else {
-                       return pc.unAckChunksTracker.ackWithTxn(cmid, txn)
                }
+               return pc.unAckChunksTracker.ackWithTxn(cmid, txn)
        }
 
        trackingID := toTrackingMessageID(msgID)
diff --git a/pulsar/transaction_test.go b/pulsar/transaction_test.go
index 66a82cc..2cb6ad2 100644
--- a/pulsar/transaction_test.go
+++ b/pulsar/transaction_test.go
@@ -423,6 +423,14 @@ func TestTransactionAbort(t *testing.T) {
        // Abort the transaction.
        _ = txn.Abort(context.Background())
 
+       consumerShouldNotReceiveMessage(t, consumer)
+
+       // Clean up: Close the consumer and producer instances.
+       consumer.Close()
+       producer.Close()
+}
+
+func consumerShouldNotReceiveMessage(t *testing.T, consumer Consumer) {
        // Expectation: The consumer should not receive any messages.
        done := make(chan struct{})
        go func() {
@@ -438,8 +446,104 @@ func TestTransactionAbort(t *testing.T) {
                require.Fail(t, "The consumer should not receive any messages")
        case <-time.After(time.Second):
        }
+}
 
-       // Clean up: Close the consumer and producer instances.
+func TestSendAndAckChunkMessage(t *testing.T) {
+       topic := newTopicName()
+       sub := "my-sub"
+
+       // Prepare: Create PulsarClient and initialize the transaction 
coordinator client.
+       _, client := createTcClient(t)
+
+       // Create transaction and register the send operation.
+       txn, err := client.NewTransaction(time.Hour)
+       require.Nil(t, err)
+       txn.(*transaction).registerSendOrAckOp()
+
+       // Create a producer with chunking enabled to send a large message that 
will be split into chunks.
+       producer, err := client.CreateProducer(ProducerOptions{
+               Name:            "test",
+               Topic:           topic,
+               EnableChunking:  true,
+               DisableBatching: true,
+       })
+       require.NoError(t, err)
+       require.NotNil(t, producer)
+       defer producer.Close()
+
+       // Subscribe to the consumer.
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               Type:             Exclusive,
+               SubscriptionName: sub,
+       })
+       require.NoError(t, err)
+       defer consumer.Close()
+
+       // Send a large message that will be split into chunks.
+       msgID, err := producer.Send(context.Background(), &ProducerMessage{
+               Payload: createTestMessagePayload(_brokerMaxMessageSize),
+       })
+       require.NoError(t, err)
+       _, ok := msgID.(*chunkMessageID)
+       require.True(t, ok)
+
+       // Attempt to commit the transaction, it should not succeed at this 
point.
+       err = txn.Commit(context.Background())
+       require.NotNil(t, err)
+
+       // End the previously registered send operation, allowing the 
transaction to commit successfully.
+       txn.(*transaction).endSendOrAckOp(nil)
+
+       // Commit the transaction successfully now.
+       err = txn.Commit(context.Background())
+       require.Nil(t, err)
+
+       // Receive the message using a new transaction and ack it.
+       txn2, err := client.NewTransaction(time.Hour)
+       require.Nil(t, err)
+       message, err := consumer.Receive(context.Background())
+       require.Nil(t, err)
+
+       err = consumer.AckWithTxn(message, txn2)
+       require.Nil(t, err)
+
+       txn2.Abort(context.Background())
+
+       // Close the consumer to simulate reconnection and receive the same 
message again.
        consumer.Close()
-       producer.Close()
+
+       // Subscribe to the consumer again.
+       consumer, err = client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               Type:             Exclusive,
+               SubscriptionName: sub,
+       })
+       require.Nil(t, err)
+       message, err = consumer.Receive(context.Background())
+       require.Nil(t, err)
+       require.NotNil(t, message)
+
+       // Create a new transaction and ack the message again.
+       txn3, err := client.NewTransaction(time.Hour)
+       require.Nil(t, err)
+
+       err = consumer.AckWithTxn(message, txn3)
+       require.Nil(t, err)
+
+       // Commit the third transaction.
+       err = txn3.Commit(context.Background())
+       require.Nil(t, err)
+
+       // Close the consumer again.
+       consumer.Close()
+
+       // Subscribe to the consumer again and verify that no message is 
received.
+       consumer, err = client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               Type:             Exclusive,
+               SubscriptionName: sub,
+       })
+       require.Nil(t, err)
+       consumerShouldNotReceiveMessage(t, consumer)
 }

Reply via email to