This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d49f15c [improve] Add Testcase to test using keyShared subscription 
and delayed messages at the same time (#1361)
5d49f15c is described below

commit 5d49f15ceebcf31cdfb41f16e8a2fc9f2d6fbd17
Author: zhou zhuohan <843520...@qq.com>
AuthorDate: Wed Jun 11 20:30:10 2025 +0800

    [improve] Add Testcase to test using keyShared subscription and delayed 
messages at the same time (#1361)
    
    Master Issue: https://github.com/apache/pulsar/issues/23968
    Related pr: https://github.com/apache/pulsar-client-go/pull/1339
    
    ### Motivation
    Refered to pr 
[comment](https://github.com/apache/pulsar-client-go/pull/1339#issuecomment-2712819343),
 there is no test cases about using keyShared subscription mode to consume 
delayed messages, so that maybe we need to add one.
    
    
    ### Modifications
    Add `pulsar/consumer_test/TestConsumerKeySharedWithDelayedMessages` test 
case.
---
 Makefile                |  2 +-
 pulsar/consumer_test.go | 90 ++++++++++++++++++++++++++++++++++++++++++++++---
 2 files changed, 87 insertions(+), 5 deletions(-)

diff --git a/Makefile b/Makefile
index 3ade7200..06e18025 100644
--- a/Makefile
+++ b/Makefile
@@ -50,7 +50,7 @@ bin/golangci-lint:
 # use golangCi-lint docker to avoid local golang env issues
 # https://golangci-lint.run/welcome/install/
 lint-docker:
-       docker run --rm -v $(shell pwd):/app -w /app 
golangci/golangci-lint:v1.51.2 golangci-lint run -v
+       docker run --rm -v $(shell pwd):/app -w /app 
golangci/golangci-lint:v1.61.0 golangci-lint run -v
 
 container:
        docker build -t ${IMAGE_NAME} \
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 59773e33..2351e202 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -336,11 +336,93 @@ func TestConsumerKeyShared(t *testing.T) {
        assert.NotEqual(t, 0, receivedConsumer1)
        assert.NotEqual(t, 0, receivedConsumer2)
 
-       t.Logf("TestConsumerKeyShared received messages consumer1: %d 
consumser2: %d\n",
+       t.Logf("TestConsumerKeyShared received messages consumer1: %d 
consumer2: %d\n",
                receivedConsumer1, receivedConsumer2)
        assert.Equal(t, 100, receivedConsumer1+receivedConsumer2)
 }
 
+// TestConsumerKeySharedWithDelayedMessages
+// test using delayed messages and key-shared sub mode at the same time
+func TestConsumerKeySharedWithDelayedMessages(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.Nil(t, err)
+       defer client.Close()
+       topic := newTopicName()
+
+       consumer1, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: "sub-1",
+               Type:             KeyShared,
+       })
+       assert.Nil(t, err)
+       defer consumer1.Close()
+       consumer2, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: "sub-1",
+               Type:             KeyShared,
+       })
+       assert.Nil(t, err)
+       defer consumer2.Close()
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topic,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+       ctx := context.Background()
+       startTime := time.Now()
+       delayTime := 3 * time.Second
+       for i := 0; i < 100; i++ {
+               _, err := producer.Send(ctx, &ProducerMessage{
+                       Key:          fmt.Sprintf("key-shared-%d", i%3),
+                       Payload:      []byte(fmt.Sprintf("value-%d", i)),
+                       DeliverAfter: delayTime,
+               })
+               assert.Nil(t, err)
+       }
+
+       receivedConsumer1 := 0
+       receivedConsumer2 := 0
+       timeoutTimer := time.After(2 * delayTime)
+       for (receivedConsumer1 + receivedConsumer2) < 100 {
+               select {
+               case <-timeoutTimer:
+                       break
+               default:
+               }
+
+               select {
+               case cm, ok := <-consumer1.Chan():
+                       if !ok {
+                               break
+                       }
+                       receivedConsumer1++
+                       _ = consumer1.Ack(cm.Message)
+                       assert.GreaterOrEqual(t, time.Since(startTime), 
delayTime,
+                               "TestConsumerKeySharedWithDelayedMessages 
should delay messages later than defined deliverAfter time",
+                       )
+               case cm, ok := <-consumer2.Chan():
+                       if !ok {
+                               break
+                       }
+                       receivedConsumer2++
+                       _ = consumer2.Ack(cm.Message)
+                       assert.GreaterOrEqual(t, time.Since(startTime), 
delayTime,
+                               "TestConsumerKeySharedWithDelayedMessages 
should delay messages later than defined deliverAfter time",
+                       )
+               }
+       }
+
+       assert.NotEqual(t, 0, receivedConsumer1)
+       assert.NotEqual(t, 0, receivedConsumer2)
+       assert.Equal(t, 100, receivedConsumer1+receivedConsumer2)
+       t.Logf("TestConsumerKeySharedWithDelayedMessages received messages 
consumer1: %d consumer2: %d, timecost: %d\n",
+               receivedConsumer1, receivedConsumer2, 
time.Since(startTime).Milliseconds(),
+       )
+}
+
 func TestPartitionTopicsConsumerPubSub(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: lookupURL,
@@ -2704,11 +2786,11 @@ func TestKeyBasedBatchProducerConsumerKeyShared(t 
*testing.T) {
        assert.Equal(t, len(consumer1Keys)*MsgBatchCount, receivedConsumer1)
        assert.Equal(t, len(consumer2Keys)*MsgBatchCount, receivedConsumer2)
 
-       t.Logf("TestKeyBasedBatchProducerConsumerKeyShared received messages 
consumer1: %d consumser2: %d\n",
+       t.Logf("TestKeyBasedBatchProducerConsumerKeyShared received messages 
consumer1: %d consumer2: %d\n",
                receivedConsumer1, receivedConsumer2)
        assert.Equal(t, 300, receivedConsumer1+receivedConsumer2)
 
-       t.Logf("TestKeyBasedBatchProducerConsumerKeyShared received messages 
keys consumer1: %v consumser2: %v\n",
+       t.Logf("TestKeyBasedBatchProducerConsumerKeyShared received messages 
keys consumer1: %v consumer2: %v\n",
                consumer1Keys, consumer2Keys)
 }
 
@@ -2887,7 +2969,7 @@ func TestConsumerKeySharedWithOrderingKey(t *testing.T) {
        assert.NotEqual(t, 0, receivedConsumer2)
 
        t.Logf(
-               "TestConsumerKeySharedWithOrderingKey received messages 
consumer1: %d consumser2: %d\n",
+               "TestConsumerKeySharedWithOrderingKey received messages 
consumer1: %d consumer2: %d\n",
                receivedConsumer1, receivedConsumer2,
        )
        assert.Equal(t, 100, receivedConsumer1+receivedConsumer2)

Reply via email to