This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 5d49f15c [improve] Add Testcase to test using keyShared subscription and delayed messages at the same time (#1361) 5d49f15c is described below commit 5d49f15ceebcf31cdfb41f16e8a2fc9f2d6fbd17 Author: zhou zhuohan <843520...@qq.com> AuthorDate: Wed Jun 11 20:30:10 2025 +0800 [improve] Add Testcase to test using keyShared subscription and delayed messages at the same time (#1361) Master Issue: https://github.com/apache/pulsar/issues/23968 Related pr: https://github.com/apache/pulsar-client-go/pull/1339 ### Motivation Refered to pr [comment](https://github.com/apache/pulsar-client-go/pull/1339#issuecomment-2712819343), there is no test cases about using keyShared subscription mode to consume delayed messages, so that maybe we need to add one. ### Modifications Add `pulsar/consumer_test/TestConsumerKeySharedWithDelayedMessages` test case. --- Makefile | 2 +- pulsar/consumer_test.go | 90 ++++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 87 insertions(+), 5 deletions(-) diff --git a/Makefile b/Makefile index 3ade7200..06e18025 100644 --- a/Makefile +++ b/Makefile @@ -50,7 +50,7 @@ bin/golangci-lint: # use golangCi-lint docker to avoid local golang env issues # https://golangci-lint.run/welcome/install/ lint-docker: - docker run --rm -v $(shell pwd):/app -w /app golangci/golangci-lint:v1.51.2 golangci-lint run -v + docker run --rm -v $(shell pwd):/app -w /app golangci/golangci-lint:v1.61.0 golangci-lint run -v container: docker build -t ${IMAGE_NAME} \ diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 59773e33..2351e202 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -336,11 +336,93 @@ func TestConsumerKeyShared(t *testing.T) { assert.NotEqual(t, 0, receivedConsumer1) assert.NotEqual(t, 0, receivedConsumer2) - t.Logf("TestConsumerKeyShared received messages consumer1: %d consumser2: %d\n", + t.Logf("TestConsumerKeyShared received messages consumer1: %d consumer2: %d\n", receivedConsumer1, receivedConsumer2) assert.Equal(t, 100, receivedConsumer1+receivedConsumer2) } +// TestConsumerKeySharedWithDelayedMessages +// test using delayed messages and key-shared sub mode at the same time +func TestConsumerKeySharedWithDelayedMessages(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + assert.Nil(t, err) + defer client.Close() + topic := newTopicName() + + consumer1, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "sub-1", + Type: KeyShared, + }) + assert.Nil(t, err) + defer consumer1.Close() + consumer2, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "sub-1", + Type: KeyShared, + }) + assert.Nil(t, err) + defer consumer2.Close() + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + }) + assert.Nil(t, err) + defer producer.Close() + ctx := context.Background() + startTime := time.Now() + delayTime := 3 * time.Second + for i := 0; i < 100; i++ { + _, err := producer.Send(ctx, &ProducerMessage{ + Key: fmt.Sprintf("key-shared-%d", i%3), + Payload: []byte(fmt.Sprintf("value-%d", i)), + DeliverAfter: delayTime, + }) + assert.Nil(t, err) + } + + receivedConsumer1 := 0 + receivedConsumer2 := 0 + timeoutTimer := time.After(2 * delayTime) + for (receivedConsumer1 + receivedConsumer2) < 100 { + select { + case <-timeoutTimer: + break + default: + } + + select { + case cm, ok := <-consumer1.Chan(): + if !ok { + break + } + receivedConsumer1++ + _ = consumer1.Ack(cm.Message) + assert.GreaterOrEqual(t, time.Since(startTime), delayTime, + "TestConsumerKeySharedWithDelayedMessages should delay messages later than defined deliverAfter time", + ) + case cm, ok := <-consumer2.Chan(): + if !ok { + break + } + receivedConsumer2++ + _ = consumer2.Ack(cm.Message) + assert.GreaterOrEqual(t, time.Since(startTime), delayTime, + "TestConsumerKeySharedWithDelayedMessages should delay messages later than defined deliverAfter time", + ) + } + } + + assert.NotEqual(t, 0, receivedConsumer1) + assert.NotEqual(t, 0, receivedConsumer2) + assert.Equal(t, 100, receivedConsumer1+receivedConsumer2) + t.Logf("TestConsumerKeySharedWithDelayedMessages received messages consumer1: %d consumer2: %d, timecost: %d\n", + receivedConsumer1, receivedConsumer2, time.Since(startTime).Milliseconds(), + ) +} + func TestPartitionTopicsConsumerPubSub(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, @@ -2704,11 +2786,11 @@ func TestKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { assert.Equal(t, len(consumer1Keys)*MsgBatchCount, receivedConsumer1) assert.Equal(t, len(consumer2Keys)*MsgBatchCount, receivedConsumer2) - t.Logf("TestKeyBasedBatchProducerConsumerKeyShared received messages consumer1: %d consumser2: %d\n", + t.Logf("TestKeyBasedBatchProducerConsumerKeyShared received messages consumer1: %d consumer2: %d\n", receivedConsumer1, receivedConsumer2) assert.Equal(t, 300, receivedConsumer1+receivedConsumer2) - t.Logf("TestKeyBasedBatchProducerConsumerKeyShared received messages keys consumer1: %v consumser2: %v\n", + t.Logf("TestKeyBasedBatchProducerConsumerKeyShared received messages keys consumer1: %v consumer2: %v\n", consumer1Keys, consumer2Keys) } @@ -2887,7 +2969,7 @@ func TestConsumerKeySharedWithOrderingKey(t *testing.T) { assert.NotEqual(t, 0, receivedConsumer2) t.Logf( - "TestConsumerKeySharedWithOrderingKey received messages consumer1: %d consumser2: %d\n", + "TestConsumerKeySharedWithOrderingKey received messages consumer1: %d consumer2: %d\n", receivedConsumer1, receivedConsumer2, ) assert.Equal(t, 100, receivedConsumer1+receivedConsumer2)