This is an automated email from the ASF dual-hosted git repository.
crossoverjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 7b4b3a6a test: add unit test for unloading topic before consuming zero
queue messages (#1434)
7b4b3a6a is described below
commit 7b4b3a6a7b512068b5f6c993f5c2c6f4362d34ea
Author: crossoverJie <[email protected]>
AuthorDate: Thu Oct 30 22:42:00 2025 +0800
test: add unit test for unloading topic before consuming zero queue
messages (#1434)
Co-authored-by: Copilot <[email protected]>
---
pulsar/consumer_zero_queue_test.go | 90 +++++++++++++++++++++++++++++++++++++-
1 file changed, 89 insertions(+), 1 deletion(-)
diff --git a/pulsar/consumer_zero_queue_test.go
b/pulsar/consumer_zero_queue_test.go
index 72048d79..444751a1 100644
--- a/pulsar/consumer_zero_queue_test.go
+++ b/pulsar/consumer_zero_queue_test.go
@@ -21,9 +21,12 @@ import (
"context"
"fmt"
"log"
+ "log/slog"
+ "os"
"testing"
"time"
+ plog "github.com/apache/pulsar-client-go/pulsar/log"
"github.com/docker/docker/api/types/container"
"github.com/docker/go-connections/nat"
"github.com/stretchr/testify/require"
@@ -202,9 +205,9 @@ func TestReconnectConsumer(t *testing.T) {
go func() {
time.Sleep(3 * time.Second)
log.Println("unloading topic")
- // Simulate a broker restart by stopping the pulsar container
topicName, err := utils.GetTopicName(topic)
assert.Nil(t, err)
+ // unload topic to trigger consumer reconnect
err = admin.Topics().Unload(*topicName)
assert.Nil(t, err)
log.Println("unloaded topic")
@@ -240,6 +243,91 @@ func TestReconnectConsumer(t *testing.T) {
defer c.Terminate(ctx)
}
+func TestUnloadTopicBeforeConsume(t *testing.T) {
+
+ sLogger := slog.New(slog.NewTextHandler(os.Stdout,
&slog.HandlerOptions{Level: slog.LevelDebug}))
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ Logger: plog.NewLoggerWithSlog(sLogger),
+ })
+ assert.Nil(t, err)
+ admin, err := pulsaradmin.NewClient(&config.Config{})
+ assert.Nil(t, err)
+
+ defer client.Close()
+
+ topic := newTopicName()
+ ctx := context.Background()
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub",
+ EnableZeroQueueConsumer: true,
+ })
+
+ assert.Nil(t, err)
+ _, ok := consumer.(*zeroQueueConsumer)
+ assert.True(t, ok)
+ defer consumer.Close()
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: false,
+ })
+ assert.Nil(t, err)
+
+ // send 10 messages
+ for i := 0; i < 10; i++ {
+ msg, err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ Key: "pulsar",
+ Properties: map[string]string{
+ "key-1": "pulsar-1",
+ },
+ })
+ assert.Nil(t, err)
+ log.Printf("send message: %s", msg.String())
+ }
+
+ log.Println("unloading topic")
+ topicName, err := utils.GetTopicName(topic)
+ assert.Nil(t, err)
+ // unload topic to trigger consumer reconnect and send permits
+ err = admin.Topics().Unload(*topicName)
+ assert.Nil(t, err)
+ log.Println("unloaded topic")
+
+ // receive 10 messages
+ for i := 0; i < 10; i++ {
+ msg, err := consumer.Receive(context.Background())
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ expectMsg := fmt.Sprintf("hello-%d", i)
+ expectProperties := map[string]string{
+ "key-1": "pulsar-1",
+ }
+ assert.Equal(t, []byte(expectMsg), msg.Payload())
+ assert.Equal(t, "pulsar", msg.Key())
+ assert.Equal(t, expectProperties, msg.Properties())
+ // ack message
+ err = consumer.Ack(msg)
+ assert.Nil(t, err)
+ log.Printf("receive message: %s", msg.ID().String())
+ }
+ // Make sure there are no more messages
+ timeout, cancel := context.WithTimeout(context.Background(),
2*time.Second)
+ defer cancel()
+ _, err = consumer.Receive(timeout)
+ assert.Equal(t, context.DeadlineExceeded, err)
+
+ err = consumer.Unsubscribe()
+ assert.Nil(t, err)
+ consumer.Close()
+ producer.Close()
+}
+
func TestMultipleConsumer(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,