This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch codex/fix-consumer-ack-partition-lookup
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to
refs/heads/codex/fix-consumer-ack-partition-lookup by this push:
new fc3e776a test: cover ack during partition update with testcontainer
fc3e776a is described below
commit fc3e776a89870956b1ad7b48573df7bac1966f0e
Author: mattisonchao <[email protected]>
AuthorDate: Tue May 12 22:01:44 2026 +0800
test: cover ack during partition update with testcontainer
---
pulsar/client.go | 3 +-
pulsar/client_impl.go | 2 +-
pulsar/consumer_test.go | 283 ----------------------------
tests/consumer_ack_partition_update_test.go | 118 ++++++++++++
tests/pulsar_container_test.go | 107 +++++++++++
5 files changed, 228 insertions(+), 285 deletions(-)
diff --git a/pulsar/client.go b/pulsar/client.go
index 0cca9469..4ab34e54 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -181,7 +181,8 @@ type ClientOptions struct {
// "Pulsar Go 3.0.0-forked".
Description string
- failureInjectHook FailureInjectHook
+ // FailureInjectHook is used by tests to pause internal client
operations at controlled points.
+ FailureInjectHook FailureInjectHook
}
// FailureInjectHook defines failure injection points used by tests.
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index a62e353c..8a4b8602 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -173,7 +173,7 @@ func newClient(options ClientOptions) (Client, error) {
memLimit:
internal.NewMemoryLimitController(memLimitBytes,
defaultMemoryLimitTriggerThreshold),
operationTimeout: operationTimeout,
tlsEnabled: tlsConfig != nil,
- failureInjectHook: options.failureInjectHook,
+ failureInjectHook: options.FailureInjectHook,
}
c.rpcClient, err = internal.NewRPCClient(options.URL, c.cnxPool,
operationTimeout, logger, metrics,
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 2c8114fe..4e232b8a 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -23,7 +23,6 @@ import (
"fmt"
"log"
"net/http"
- "net/url"
"os"
"regexp"
"strconv"
@@ -5584,288 +5583,6 @@ func TestAckIDList(t *testing.T) {
}
}
-func TestAckIDDoesNotPanicForNilPartitionConsumer(t *testing.T) {
- msgID := newMessageID(1, 2, -1, 0, 0)
- tests := []struct {
- name string
- ack func(*consumer) error
- wantError bool
- }{
- {
- name: "AckID",
- ack: func(consumer *consumer) error {
- return consumer.AckID(msgID)
- },
- wantError: true,
- },
- {
- name: "AckIDCumulative",
- ack: func(consumer *consumer) error {
- return consumer.AckIDCumulative(msgID)
- },
- wantError: true,
- },
- {
- name: "AckIDList",
- ack: func(consumer *consumer) error {
- return consumer.AckIDList([]MessageID{msgID})
- },
- },
- }
-
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- consumer := &consumer{
- consumers: []*partitionConsumer{nil},
- log: plog.DefaultNopLogger(),
- }
-
- var err error
- require.NotPanics(t, func() {
- err = tt.ack(consumer)
- })
- if tt.wantError {
- require.Error(t, err)
- }
- })
- }
-}
-
-func TestAckIDWaitsForPartitionConsumerUpdate(t *testing.T) {
- msgID := newMessageID(1, 2, -1, 0, 0)
- hookEntered := make(chan struct{})
- releaseHook := make(chan struct{})
- clientOptions := ClientOptions{
- failureInjectHook: &blockingFailureInjectHook{
- beforeAssignPartitionConsumersFunc: func() {
- close(hookEntered)
- <-releaseHook
- },
- },
- }
- cons := newConsumerPartitionUpdateTestConsumer(clientOptions)
-
- updateErrCh := make(chan error, 1)
- go func() {
- updateErrCh <- cons.internalTopicSubscribeToPartitions()
- }()
-
- select {
- case <-hookEntered:
- case err := <-updateErrCh:
- require.NoError(t, err)
- case <-time.After(time.Second):
- t.Fatal("timed out waiting for partition consumer update hook")
- }
-
- errCh := make(chan error, 1)
- go func() {
- errCh <- cons.AckID(msgID)
- }()
-
- select {
- case err := <-errCh:
- t.Fatalf("AckID returned while partition consumer update lock
was held: %v", err)
- case <-time.After(50 * time.Millisecond):
- }
-
- close(releaseHook)
-
- select {
- case err := <-updateErrCh:
- require.NoError(t, err)
- case <-time.After(time.Second):
- t.Fatal("timed out waiting for partition consumer update to
finish")
- }
-
- defer func() {
- for _, pc := range cons.consumers {
- if pc != nil {
- pc.Close()
- }
- }
- }()
-
- select {
- case err := <-errCh:
- require.NoError(t, err)
- case <-time.After(time.Second):
- t.Fatal("timed out waiting for AckID after partition consumer
update lock was released")
- }
-}
-
-func TestClientFailureInjectHookReproducesAckDuringPartitionUpdate(t
*testing.T) {
- hookEntered := make(chan struct{})
- releaseHook := make(chan struct{})
- var hookCalls int32
- req := testcontainers.ContainerRequest{
- Image: getPulsarTestImage(),
- ExposedPorts: []string{"6650/tcp", "8080/tcp"},
- WaitingFor:
wait.ForListeningPort("6650/tcp").SkipInternalCheck().WithStartupTimeout(2 *
time.Minute),
- Cmd: []string{"bin/pulsar", "standalone", "-nfw",
"--advertised-address", "localhost"},
- }
- container, err := testcontainers.GenericContainer(context.Background(),
testcontainers.GenericContainerRequest{
- ContainerRequest: req,
- Started: true,
- })
- require.NoError(t, err, "Failed to start the pulsar container")
- defer func() {
- _ = container.Terminate(context.Background())
- }()
-
- endpoint, err := container.PortEndpoint(context.Background(), "6650",
"pulsar")
- require.NoError(t, err, "Failed to get the pulsar endpoint")
- adminEndpoint, err := container.PortEndpoint(context.Background(),
"8080", "http")
- require.NoError(t, err, "Failed to get the pulsar admin endpoint")
-
- topic := "persistent://public/default/" + newTopicName()
- pulsarAdmin, err := pulsaradmin.NewClient(&config.Config{WebServiceURL:
adminEndpoint})
- require.NoError(t, err)
- topicName, err := utils.GetTopicName(topic)
- require.NoError(t, err)
- require.Eventually(t, func() bool {
- err = pulsarAdmin.Brokers().HealthCheck()
- return err == nil
- }, 60*time.Second, time.Second)
- require.NoError(t, err)
- require.Eventually(t, func() bool {
- err = pulsarAdmin.Topics().Create(*topicName, 1)
- return err == nil
- }, 60*time.Second, time.Second)
- require.NoError(t, err)
-
- client, err := NewClient(ClientOptions{
- URL: endpoint,
- OperationTimeout: 30 * time.Second,
- failureInjectHook: &blockingFailureInjectHook{
- beforeAssignPartitionConsumersFunc: func() {
- if atomic.AddInt32(&hookCalls, 1) == 1 {
- return
- }
- close(hookEntered)
- <-releaseHook
- },
- },
- })
- require.NoError(t, err)
- defer client.Close()
-
- var cons Consumer
- require.Eventually(t, func() bool {
- cons, err = client.Subscribe(ConsumerOptions{
- Topic: topic,
- SubscriptionName: "sub",
- AutoDiscoveryPeriod: time.Millisecond,
- })
- return err == nil
- }, 30*time.Second, time.Second)
- defer cons.Close()
-
- require.Eventually(t, func() bool {
- err = pulsarAdmin.Topics().Update(*topicName, 2)
- return err == nil
- }, 30*time.Second, time.Second)
- require.NoError(t, err)
-
- select {
- case <-hookEntered:
- case <-time.After(10 * time.Second):
- t.Fatal("timed out waiting for failure injection hook")
- }
-
- ackErrCh := make(chan error, 1)
- go func() {
- ackErrCh <- cons.AckID(newMessageID(1, 2, -1, 1, 0))
- }()
-
- select {
- case err := <-ackErrCh:
- t.Fatalf("AckID returned while partition consumer update was
blocked: %v", err)
- case <-time.After(50 * time.Millisecond):
- }
-
- close(releaseHook)
-
- select {
- case err := <-ackErrCh:
- require.NoError(t, err)
- case <-time.After(10 * time.Second):
- t.Fatal("timed out waiting for AckID after partition consumer
update was released")
- }
-}
-
-func newConsumerPartitionUpdateTestConsumer(options ClientOptions) *consumer {
- brokerURL, _ := url.Parse("pulsar://localhost:6650")
- cnx := newSpyConnection()
- rpc := &grabConnSpyRPCClient{
- cnx: cnx,
- lookupResult: &internal.LookupResult{
- LogicalAddr: brokerURL,
- PhysicalAddr: brokerURL,
- },
- }
- client := &client{
- cnxPool: &partitionUpdateConnectionPool{cnx: cnx},
- rpcClient: rpc,
- lookupService: &partitionUpdateLookupService{partitions: 1},
- failureInjectHook: options.failureInjectHook,
- log: plog.DefaultNopLogger(),
- }
- return &consumer{
- topic: "persistent://public/default/testpartitionupdate",
- client: client,
- options: ConsumerOptions{
- SubscriptionName: "sub",
- ReceiverQueueSize: 1,
- },
- consumers: []*partitionConsumer{},
- messageCh: make(chan ConsumerMessage, 1),
- closeCh: make(chan struct{}),
- log: plog.DefaultNopLogger(),
- metrics: newTestMetrics(),
- }
-}
-
-type blockingFailureInjectHook struct {
- beforeAssignPartitionConsumersFunc func()
-}
-
-func (h *blockingFailureInjectHook) BeforeAssignPartitionConsumers() {
- h.beforeAssignPartitionConsumersFunc()
-}
-
-type partitionUpdateConnectionPool struct {
- internal.ConnectionPool
- cnx internal.Connection
-}
-
-func (p *partitionUpdateConnectionPool) GetConnection(_ *url.URL, _ *url.URL,
_ int32) (internal.Connection, error) {
- return p.cnx, nil
-}
-
-func (p *partitionUpdateConnectionPool) GetConnections()
map[string]internal.Connection {
- return nil
-}
-
-func (p *partitionUpdateConnectionPool) GenerateRoundRobinIndex() int32 {
- return 0
-}
-
-func (p *partitionUpdateConnectionPool) Close() {}
-
-func (r *grabConnSpyRPCClient) NewConsumerID() uint64 {
- return 1
-}
-
-type partitionUpdateLookupService struct {
- internal.LookupService
- partitions int
-}
-
-func (l *partitionUpdateLookupService) GetPartitionedTopicMetadata(_ string)
(*internal.PartitionedTopicMetadata, error) {
- return &internal.PartitionedTopicMetadata{Partitions: l.partitions}, nil
-}
-
func getAckCount(registry *prometheus.Registry) (int, error) {
metrics, err := registry.Gather()
if err != nil {
diff --git a/tests/consumer_ack_partition_update_test.go
b/tests/consumer_ack_partition_update_test.go
new file mode 100644
index 00000000..4c6279e8
--- /dev/null
+++ b/tests/consumer_ack_partition_update_test.go
@@ -0,0 +1,118 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tests
+
+import (
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/apache/pulsar-client-go/pulsar"
+ "github.com/apache/pulsar-client-go/pulsaradmin"
+ "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
+ "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
+ "github.com/google/uuid"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestConsumerAckIDListDuringPartitionUpdate(t *testing.T) {
+ hookEntered := make(chan struct{})
+ releaseHook := make(chan struct{})
+ var hookCalls int32
+
+ topic := "persistent://public/default/ack-update-" + uuid.NewString()
+ pulsarAdmin, err := pulsaradmin.NewClient(&config.Config{WebServiceURL:
testPulsar.AdminURL})
+ require.NoError(t, err)
+ topicName, err := utils.GetTopicName(topic)
+ require.NoError(t, err)
+ var lastCreateErr error
+ created := assert.Eventually(t, func() bool {
+ lastCreateErr = pulsarAdmin.Topics().Create(*topicName, 1)
+ return lastCreateErr == nil
+ }, 60*time.Second, time.Second)
+ require.Truef(t, created, "last error: %v", lastCreateErr)
+ require.NoError(t, lastCreateErr)
+
+ client, err := pulsar.NewClient(pulsar.ClientOptions{
+ URL: testPulsar.BrokerURL,
+ OperationTimeout: 30 * time.Second,
+ FailureInjectHook: &blockingFailureInjectHook{
+ beforeAssignPartitionConsumersFunc: func() {
+ if atomic.AddInt32(&hookCalls, 1) == 1 {
+ return
+ }
+ close(hookEntered)
+ <-releaseHook
+ },
+ },
+ })
+ require.NoError(t, err)
+ defer client.Close()
+
+ var consumer pulsar.Consumer
+ require.Eventually(t, func() bool {
+ consumer, err = client.Subscribe(pulsar.ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "sub",
+ AutoDiscoveryPeriod: time.Millisecond,
+ })
+ return err == nil
+ }, 30*time.Second, time.Second)
+ defer consumer.Close()
+
+ require.Eventually(t, func() bool {
+ err = pulsarAdmin.Topics().Update(*topicName, 2)
+ return err == nil
+ }, 30*time.Second, time.Second)
+ require.NoError(t, err)
+
+ select {
+ case <-hookEntered:
+ case <-time.After(10 * time.Second):
+ t.Fatal("timed out waiting for failure injection hook")
+ }
+
+ ackErrCh := make(chan error, 1)
+ go func() {
+ ackErrCh <-
consumer.AckIDList([]pulsar.MessageID{pulsar.NewMessageID(1, 2, -1, 1)})
+ }()
+
+ select {
+ case err := <-ackErrCh:
+ t.Fatalf("AckIDList returned while partition consumer update
was blocked: %v", err)
+ case <-time.After(50 * time.Millisecond):
+ }
+
+ close(releaseHook)
+
+ select {
+ case err := <-ackErrCh:
+ require.NoError(t, err)
+ case <-time.After(10 * time.Second):
+ t.Fatal("timed out waiting for AckIDList after partition
consumer update was released")
+ }
+}
+
+type blockingFailureInjectHook struct {
+ beforeAssignPartitionConsumersFunc func()
+}
+
+func (h *blockingFailureInjectHook) BeforeAssignPartitionConsumers() {
+ h.beforeAssignPartitionConsumersFunc()
+}
diff --git a/tests/pulsar_container_test.go b/tests/pulsar_container_test.go
new file mode 100644
index 00000000..9e10941b
--- /dev/null
+++ b/tests/pulsar_container_test.go
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tests
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "os"
+ "testing"
+ "time"
+
+ "github.com/testcontainers/testcontainers-go"
+ "github.com/testcontainers/testcontainers-go/wait"
+)
+
+type pulsarTestContainer struct {
+ testcontainers.Container
+ BrokerURL string
+ AdminURL string
+}
+
+var testPulsar *pulsarTestContainer
+
+func TestMain(m *testing.M) {
+ _ = os.Setenv("NO_PROXY", "localhost,127.0.0.1,0.0.0.0")
+ _ = os.Setenv("no_proxy", "localhost,127.0.0.1,0.0.0.0")
+
+ ctx := context.Background()
+ var err error
+ testPulsar, err = startPulsarContainer(ctx)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "failed to start Pulsar container:
%v\n", err)
+ os.Exit(1)
+ }
+
+ code := m.Run()
+ if err := testPulsar.Terminate(ctx); err != nil {
+ fmt.Fprintf(os.Stderr, "failed to terminate Pulsar container:
%v\n", err)
+ if code == 0 {
+ code = 1
+ }
+ }
+ os.Exit(code)
+}
+
+func startPulsarContainer(ctx context.Context) (*pulsarTestContainer, error) {
+ req := testcontainers.ContainerRequest{
+ Image: pulsarTestImage(),
+ ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+ WaitingFor: wait.ForHTTP("/admin/v2/clusters").
+ WithPort("8080/tcp").
+ WithResponseMatcher(func(body io.Reader) bool {
+ respBytes, _ := io.ReadAll(body)
+ return string(respBytes) == `["standalone"]`
+ }).
+ WithStartupTimeout(2 * time.Minute),
+ Cmd: []string{"bin/pulsar", "standalone", "-nfw",
"--advertised-address", "localhost"},
+ }
+ container, err := testcontainers.GenericContainer(ctx,
testcontainers.GenericContainerRequest{
+ ContainerRequest: req,
+ Started: true,
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ brokerURL, err := container.PortEndpoint(ctx, "6650", "pulsar")
+ if err != nil {
+ _ = container.Terminate(ctx)
+ return nil, err
+ }
+ adminURL, err := container.PortEndpoint(ctx, "8080", "http")
+ if err != nil {
+ _ = container.Terminate(ctx)
+ return nil, err
+ }
+
+ return &pulsarTestContainer{
+ Container: container,
+ BrokerURL: brokerURL,
+ AdminURL: adminURL,
+ }, nil
+}
+
+func pulsarTestImage() string {
+ image := os.Getenv("PULSAR_IMAGE")
+ if image == "" {
+ image = "apachepulsar/pulsar:latest"
+ }
+ return image
+}