This is an automated email from the ASF dual-hosted git repository.

crossoverjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b4b3a6a test: add unit test for unloading topic before consuming zero 
queue messages (#1434)
7b4b3a6a is described below

commit 7b4b3a6a7b512068b5f6c993f5c2c6f4362d34ea
Author: crossoverJie <[email protected]>
AuthorDate: Thu Oct 30 22:42:00 2025 +0800

    test: add unit test for unloading topic before consuming zero queue 
messages (#1434)
    
    Co-authored-by: Copilot <[email protected]>
---
 pulsar/consumer_zero_queue_test.go | 90 +++++++++++++++++++++++++++++++++++++-
 1 file changed, 89 insertions(+), 1 deletion(-)

diff --git a/pulsar/consumer_zero_queue_test.go 
b/pulsar/consumer_zero_queue_test.go
index 72048d79..444751a1 100644
--- a/pulsar/consumer_zero_queue_test.go
+++ b/pulsar/consumer_zero_queue_test.go
@@ -21,9 +21,12 @@ import (
        "context"
        "fmt"
        "log"
+       "log/slog"
+       "os"
        "testing"
        "time"
 
+       plog "github.com/apache/pulsar-client-go/pulsar/log"
        "github.com/docker/docker/api/types/container"
        "github.com/docker/go-connections/nat"
        "github.com/stretchr/testify/require"
@@ -202,9 +205,9 @@ func TestReconnectConsumer(t *testing.T) {
        go func() {
                time.Sleep(3 * time.Second)
                log.Println("unloading topic")
-               // Simulate a broker restart by stopping the pulsar container
                topicName, err := utils.GetTopicName(topic)
                assert.Nil(t, err)
+               // unload topic to trigger consumer reconnect
                err = admin.Topics().Unload(*topicName)
                assert.Nil(t, err)
                log.Println("unloaded topic")
@@ -240,6 +243,91 @@ func TestReconnectConsumer(t *testing.T) {
        defer c.Terminate(ctx)
 }
 
+func TestUnloadTopicBeforeConsume(t *testing.T) {
+
+       sLogger := slog.New(slog.NewTextHandler(os.Stdout, 
&slog.HandlerOptions{Level: slog.LevelDebug}))
+       client, err := NewClient(ClientOptions{
+               URL:    lookupURL,
+               Logger: plog.NewLoggerWithSlog(sLogger),
+       })
+       assert.Nil(t, err)
+       admin, err := pulsaradmin.NewClient(&config.Config{})
+       assert.Nil(t, err)
+
+       defer client.Close()
+
+       topic := newTopicName()
+       ctx := context.Background()
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:                   topic,
+               SubscriptionName:        "my-sub",
+               EnableZeroQueueConsumer: true,
+       })
+
+       assert.Nil(t, err)
+       _, ok := consumer.(*zeroQueueConsumer)
+       assert.True(t, ok)
+       defer consumer.Close()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: false,
+       })
+       assert.Nil(t, err)
+
+       // send 10 messages
+       for i := 0; i < 10; i++ {
+               msg, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+                       Key:     "pulsar",
+                       Properties: map[string]string{
+                               "key-1": "pulsar-1",
+                       },
+               })
+               assert.Nil(t, err)
+               log.Printf("send message: %s", msg.String())
+       }
+
+       log.Println("unloading topic")
+       topicName, err := utils.GetTopicName(topic)
+       assert.Nil(t, err)
+       // unload topic to trigger consumer reconnect and send permits
+       err = admin.Topics().Unload(*topicName)
+       assert.Nil(t, err)
+       log.Println("unloaded topic")
+
+       // receive 10 messages
+       for i := 0; i < 10; i++ {
+               msg, err := consumer.Receive(context.Background())
+               if err != nil {
+                       log.Fatal(err)
+               }
+
+               expectMsg := fmt.Sprintf("hello-%d", i)
+               expectProperties := map[string]string{
+                       "key-1": "pulsar-1",
+               }
+               assert.Equal(t, []byte(expectMsg), msg.Payload())
+               assert.Equal(t, "pulsar", msg.Key())
+               assert.Equal(t, expectProperties, msg.Properties())
+               // ack message
+               err = consumer.Ack(msg)
+               assert.Nil(t, err)
+               log.Printf("receive message: %s", msg.ID().String())
+       }
+       // Make sure there are no more messages
+       timeout, cancel := context.WithTimeout(context.Background(), 
2*time.Second)
+       defer cancel()
+       _, err = consumer.Receive(timeout)
+       assert.Equal(t, context.DeadlineExceeded, err)
+
+       err = consumer.Unsubscribe()
+       assert.Nil(t, err)
+       consumer.Close()
+       producer.Close()
+}
+
 func TestMultipleConsumer(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: lookupURL,

Reply via email to