This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch handle_txn_of_chunk_message_reponse
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to
refs/heads/handle_txn_of_chunk_message_reponse by this push:
new 486e885 add test
486e885 is described below
commit 486e885e4c25505f1e4e9403dac2dd0207b7c39a
Author: xiangying <[email protected]>
AuthorDate: Sat Jul 22 20:15:11 2023 +0800
add test
---
pulsar/consumer_partition.go | 3 +-
pulsar/transaction_test.go | 108 ++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 107 insertions(+), 4 deletions(-)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 74e1a38..a99cec3 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -428,9 +428,8 @@ func (pc *partitionConsumer) ackIDCommon(msgID MessageID,
withResponse bool, txn
if cmid, ok := msgID.(*chunkMessageID); ok {
if txn == nil {
return pc.unAckChunksTracker.ack(cmid)
- } else {
- return pc.unAckChunksTracker.ackWithTxn(cmid, txn)
}
+ return pc.unAckChunksTracker.ackWithTxn(cmid, txn)
}
trackingID := toTrackingMessageID(msgID)
diff --git a/pulsar/transaction_test.go b/pulsar/transaction_test.go
index 66a82cc..2cb6ad2 100644
--- a/pulsar/transaction_test.go
+++ b/pulsar/transaction_test.go
@@ -423,6 +423,14 @@ func TestTransactionAbort(t *testing.T) {
// Abort the transaction.
_ = txn.Abort(context.Background())
+ consumerShouldNotReceiveMessage(t, consumer)
+
+ // Clean up: Close the consumer and producer instances.
+ consumer.Close()
+ producer.Close()
+}
+
+func consumerShouldNotReceiveMessage(t *testing.T, consumer Consumer) {
// Expectation: The consumer should not receive any messages.
done := make(chan struct{})
go func() {
@@ -438,8 +446,104 @@ func TestTransactionAbort(t *testing.T) {
require.Fail(t, "The consumer should not receive any messages")
case <-time.After(time.Second):
}
+}
- // Clean up: Close the consumer and producer instances.
+func TestSendAndAckChunkMessage(t *testing.T) {
+ topic := newTopicName()
+ sub := "my-sub"
+
+ // Prepare: Create PulsarClient and initialize the transaction
coordinator client.
+ _, client := createTcClient(t)
+
+ // Create transaction and register the send operation.
+ txn, err := client.NewTransaction(time.Hour)
+ require.Nil(t, err)
+ txn.(*transaction).registerSendOrAckOp()
+
+ // Create a producer with chunking enabled to send a large message that
will be split into chunks.
+ producer, err := client.CreateProducer(ProducerOptions{
+ Name: "test",
+ Topic: topic,
+ EnableChunking: true,
+ DisableBatching: true,
+ })
+ require.NoError(t, err)
+ require.NotNil(t, producer)
+ defer producer.Close()
+
+ // Subscribe to the consumer.
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ Type: Exclusive,
+ SubscriptionName: sub,
+ })
+ require.NoError(t, err)
+ defer consumer.Close()
+
+ // Send a large message that will be split into chunks.
+ msgID, err := producer.Send(context.Background(), &ProducerMessage{
+ Payload: createTestMessagePayload(_brokerMaxMessageSize),
+ })
+ require.NoError(t, err)
+ _, ok := msgID.(*chunkMessageID)
+ require.True(t, ok)
+
+ // Attempt to commit the transaction, it should not succeed at this
point.
+ err = txn.Commit(context.Background())
+ require.NotNil(t, err)
+
+ // End the previously registered send operation, allowing the
transaction to commit successfully.
+ txn.(*transaction).endSendOrAckOp(nil)
+
+ // Commit the transaction successfully now.
+ err = txn.Commit(context.Background())
+ require.Nil(t, err)
+
+ // Receive the message using a new transaction and ack it.
+ txn2, err := client.NewTransaction(time.Hour)
+ require.Nil(t, err)
+ message, err := consumer.Receive(context.Background())
+ require.Nil(t, err)
+
+ err = consumer.AckWithTxn(message, txn2)
+ require.Nil(t, err)
+
+ txn2.Abort(context.Background())
+
+ // Close the consumer to simulate reconnection and receive the same
message again.
consumer.Close()
- producer.Close()
+
+ // Subscribe to the consumer again.
+ consumer, err = client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ Type: Exclusive,
+ SubscriptionName: sub,
+ })
+ require.Nil(t, err)
+ message, err = consumer.Receive(context.Background())
+ require.Nil(t, err)
+ require.NotNil(t, message)
+
+ // Create a new transaction and ack the message again.
+ txn3, err := client.NewTransaction(time.Hour)
+ require.Nil(t, err)
+
+ err = consumer.AckWithTxn(message, txn3)
+ require.Nil(t, err)
+
+ // Commit the third transaction.
+ err = txn3.Commit(context.Background())
+ require.Nil(t, err)
+
+ // Close the consumer again.
+ consumer.Close()
+
+ // Subscribe to the consumer again and verify that no message is
received.
+ consumer, err = client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ Type: Exclusive,
+ SubscriptionName: sub,
+ })
+ require.Nil(t, err)
+ consumerShouldNotReceiveMessage(t, consumer)
}