This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch codex/fix-consumer-ack-partition-lookup
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to 
refs/heads/codex/fix-consumer-ack-partition-lookup by this push:
     new fc3e776a test: cover ack during partition update with testcontainer
fc3e776a is described below

commit fc3e776a89870956b1ad7b48573df7bac1966f0e
Author: mattisonchao <[email protected]>
AuthorDate: Tue May 12 22:01:44 2026 +0800

    test: cover ack during partition update with testcontainer
---
 pulsar/client.go                            |   3 +-
 pulsar/client_impl.go                       |   2 +-
 pulsar/consumer_test.go                     | 283 ----------------------------
 tests/consumer_ack_partition_update_test.go | 118 ++++++++++++
 tests/pulsar_container_test.go              | 107 +++++++++++
 5 files changed, 228 insertions(+), 285 deletions(-)

diff --git a/pulsar/client.go b/pulsar/client.go
index 0cca9469..4ab34e54 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -181,7 +181,8 @@ type ClientOptions struct {
        // "Pulsar Go 3.0.0-forked".
        Description string
 
-       failureInjectHook FailureInjectHook
+       // FailureInjectHook is used by tests to pause internal client 
operations at controlled points.
+       FailureInjectHook FailureInjectHook
 }
 
 // FailureInjectHook defines failure injection points used by tests.
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index a62e353c..8a4b8602 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -173,7 +173,7 @@ func newClient(options ClientOptions) (Client, error) {
                memLimit:          
internal.NewMemoryLimitController(memLimitBytes, 
defaultMemoryLimitTriggerThreshold),
                operationTimeout:  operationTimeout,
                tlsEnabled:        tlsConfig != nil,
-               failureInjectHook: options.failureInjectHook,
+               failureInjectHook: options.FailureInjectHook,
        }
 
        c.rpcClient, err = internal.NewRPCClient(options.URL, c.cnxPool, 
operationTimeout, logger, metrics,
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 2c8114fe..4e232b8a 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -23,7 +23,6 @@ import (
        "fmt"
        "log"
        "net/http"
-       "net/url"
        "os"
        "regexp"
        "strconv"
@@ -5584,288 +5583,6 @@ func TestAckIDList(t *testing.T) {
        }
 }
 
-func TestAckIDDoesNotPanicForNilPartitionConsumer(t *testing.T) {
-       msgID := newMessageID(1, 2, -1, 0, 0)
-       tests := []struct {
-               name      string
-               ack       func(*consumer) error
-               wantError bool
-       }{
-               {
-                       name: "AckID",
-                       ack: func(consumer *consumer) error {
-                               return consumer.AckID(msgID)
-                       },
-                       wantError: true,
-               },
-               {
-                       name: "AckIDCumulative",
-                       ack: func(consumer *consumer) error {
-                               return consumer.AckIDCumulative(msgID)
-                       },
-                       wantError: true,
-               },
-               {
-                       name: "AckIDList",
-                       ack: func(consumer *consumer) error {
-                               return consumer.AckIDList([]MessageID{msgID})
-                       },
-               },
-       }
-
-       for _, tt := range tests {
-               t.Run(tt.name, func(t *testing.T) {
-                       consumer := &consumer{
-                               consumers: []*partitionConsumer{nil},
-                               log:       plog.DefaultNopLogger(),
-                       }
-
-                       var err error
-                       require.NotPanics(t, func() {
-                               err = tt.ack(consumer)
-                       })
-                       if tt.wantError {
-                               require.Error(t, err)
-                       }
-               })
-       }
-}
-
-func TestAckIDWaitsForPartitionConsumerUpdate(t *testing.T) {
-       msgID := newMessageID(1, 2, -1, 0, 0)
-       hookEntered := make(chan struct{})
-       releaseHook := make(chan struct{})
-       clientOptions := ClientOptions{
-               failureInjectHook: &blockingFailureInjectHook{
-                       beforeAssignPartitionConsumersFunc: func() {
-                               close(hookEntered)
-                               <-releaseHook
-                       },
-               },
-       }
-       cons := newConsumerPartitionUpdateTestConsumer(clientOptions)
-
-       updateErrCh := make(chan error, 1)
-       go func() {
-               updateErrCh <- cons.internalTopicSubscribeToPartitions()
-       }()
-
-       select {
-       case <-hookEntered:
-       case err := <-updateErrCh:
-               require.NoError(t, err)
-       case <-time.After(time.Second):
-               t.Fatal("timed out waiting for partition consumer update hook")
-       }
-
-       errCh := make(chan error, 1)
-       go func() {
-               errCh <- cons.AckID(msgID)
-       }()
-
-       select {
-       case err := <-errCh:
-               t.Fatalf("AckID returned while partition consumer update lock 
was held: %v", err)
-       case <-time.After(50 * time.Millisecond):
-       }
-
-       close(releaseHook)
-
-       select {
-       case err := <-updateErrCh:
-               require.NoError(t, err)
-       case <-time.After(time.Second):
-               t.Fatal("timed out waiting for partition consumer update to 
finish")
-       }
-
-       defer func() {
-               for _, pc := range cons.consumers {
-                       if pc != nil {
-                               pc.Close()
-                       }
-               }
-       }()
-
-       select {
-       case err := <-errCh:
-               require.NoError(t, err)
-       case <-time.After(time.Second):
-               t.Fatal("timed out waiting for AckID after partition consumer 
update lock was released")
-       }
-}
-
-func TestClientFailureInjectHookReproducesAckDuringPartitionUpdate(t 
*testing.T) {
-       hookEntered := make(chan struct{})
-       releaseHook := make(chan struct{})
-       var hookCalls int32
-       req := testcontainers.ContainerRequest{
-               Image:        getPulsarTestImage(),
-               ExposedPorts: []string{"6650/tcp", "8080/tcp"},
-               WaitingFor:   
wait.ForListeningPort("6650/tcp").SkipInternalCheck().WithStartupTimeout(2 * 
time.Minute),
-               Cmd:          []string{"bin/pulsar", "standalone", "-nfw", 
"--advertised-address", "localhost"},
-       }
-       container, err := testcontainers.GenericContainer(context.Background(), 
testcontainers.GenericContainerRequest{
-               ContainerRequest: req,
-               Started:          true,
-       })
-       require.NoError(t, err, "Failed to start the pulsar container")
-       defer func() {
-               _ = container.Terminate(context.Background())
-       }()
-
-       endpoint, err := container.PortEndpoint(context.Background(), "6650", 
"pulsar")
-       require.NoError(t, err, "Failed to get the pulsar endpoint")
-       adminEndpoint, err := container.PortEndpoint(context.Background(), 
"8080", "http")
-       require.NoError(t, err, "Failed to get the pulsar admin endpoint")
-
-       topic := "persistent://public/default/" + newTopicName()
-       pulsarAdmin, err := pulsaradmin.NewClient(&config.Config{WebServiceURL: 
adminEndpoint})
-       require.NoError(t, err)
-       topicName, err := utils.GetTopicName(topic)
-       require.NoError(t, err)
-       require.Eventually(t, func() bool {
-               err = pulsarAdmin.Brokers().HealthCheck()
-               return err == nil
-       }, 60*time.Second, time.Second)
-       require.NoError(t, err)
-       require.Eventually(t, func() bool {
-               err = pulsarAdmin.Topics().Create(*topicName, 1)
-               return err == nil
-       }, 60*time.Second, time.Second)
-       require.NoError(t, err)
-
-       client, err := NewClient(ClientOptions{
-               URL:              endpoint,
-               OperationTimeout: 30 * time.Second,
-               failureInjectHook: &blockingFailureInjectHook{
-                       beforeAssignPartitionConsumersFunc: func() {
-                               if atomic.AddInt32(&hookCalls, 1) == 1 {
-                                       return
-                               }
-                               close(hookEntered)
-                               <-releaseHook
-                       },
-               },
-       })
-       require.NoError(t, err)
-       defer client.Close()
-
-       var cons Consumer
-       require.Eventually(t, func() bool {
-               cons, err = client.Subscribe(ConsumerOptions{
-                       Topic:               topic,
-                       SubscriptionName:    "sub",
-                       AutoDiscoveryPeriod: time.Millisecond,
-               })
-               return err == nil
-       }, 30*time.Second, time.Second)
-       defer cons.Close()
-
-       require.Eventually(t, func() bool {
-               err = pulsarAdmin.Topics().Update(*topicName, 2)
-               return err == nil
-       }, 30*time.Second, time.Second)
-       require.NoError(t, err)
-
-       select {
-       case <-hookEntered:
-       case <-time.After(10 * time.Second):
-               t.Fatal("timed out waiting for failure injection hook")
-       }
-
-       ackErrCh := make(chan error, 1)
-       go func() {
-               ackErrCh <- cons.AckID(newMessageID(1, 2, -1, 1, 0))
-       }()
-
-       select {
-       case err := <-ackErrCh:
-               t.Fatalf("AckID returned while partition consumer update was 
blocked: %v", err)
-       case <-time.After(50 * time.Millisecond):
-       }
-
-       close(releaseHook)
-
-       select {
-       case err := <-ackErrCh:
-               require.NoError(t, err)
-       case <-time.After(10 * time.Second):
-               t.Fatal("timed out waiting for AckID after partition consumer 
update was released")
-       }
-}
-
-func newConsumerPartitionUpdateTestConsumer(options ClientOptions) *consumer {
-       brokerURL, _ := url.Parse("pulsar://localhost:6650")
-       cnx := newSpyConnection()
-       rpc := &grabConnSpyRPCClient{
-               cnx: cnx,
-               lookupResult: &internal.LookupResult{
-                       LogicalAddr:  brokerURL,
-                       PhysicalAddr: brokerURL,
-               },
-       }
-       client := &client{
-               cnxPool:           &partitionUpdateConnectionPool{cnx: cnx},
-               rpcClient:         rpc,
-               lookupService:     &partitionUpdateLookupService{partitions: 1},
-               failureInjectHook: options.failureInjectHook,
-               log:               plog.DefaultNopLogger(),
-       }
-       return &consumer{
-               topic:  "persistent://public/default/testpartitionupdate",
-               client: client,
-               options: ConsumerOptions{
-                       SubscriptionName:  "sub",
-                       ReceiverQueueSize: 1,
-               },
-               consumers: []*partitionConsumer{},
-               messageCh: make(chan ConsumerMessage, 1),
-               closeCh:   make(chan struct{}),
-               log:       plog.DefaultNopLogger(),
-               metrics:   newTestMetrics(),
-       }
-}
-
-type blockingFailureInjectHook struct {
-       beforeAssignPartitionConsumersFunc func()
-}
-
-func (h *blockingFailureInjectHook) BeforeAssignPartitionConsumers() {
-       h.beforeAssignPartitionConsumersFunc()
-}
-
-type partitionUpdateConnectionPool struct {
-       internal.ConnectionPool
-       cnx internal.Connection
-}
-
-func (p *partitionUpdateConnectionPool) GetConnection(_ *url.URL, _ *url.URL, 
_ int32) (internal.Connection, error) {
-       return p.cnx, nil
-}
-
-func (p *partitionUpdateConnectionPool) GetConnections() 
map[string]internal.Connection {
-       return nil
-}
-
-func (p *partitionUpdateConnectionPool) GenerateRoundRobinIndex() int32 {
-       return 0
-}
-
-func (p *partitionUpdateConnectionPool) Close() {}
-
-func (r *grabConnSpyRPCClient) NewConsumerID() uint64 {
-       return 1
-}
-
-type partitionUpdateLookupService struct {
-       internal.LookupService
-       partitions int
-}
-
-func (l *partitionUpdateLookupService) GetPartitionedTopicMetadata(_ string) 
(*internal.PartitionedTopicMetadata, error) {
-       return &internal.PartitionedTopicMetadata{Partitions: l.partitions}, nil
-}
-
 func getAckCount(registry *prometheus.Registry) (int, error) {
        metrics, err := registry.Gather()
        if err != nil {
diff --git a/tests/consumer_ack_partition_update_test.go 
b/tests/consumer_ack_partition_update_test.go
new file mode 100644
index 00000000..4c6279e8
--- /dev/null
+++ b/tests/consumer_ack_partition_update_test.go
@@ -0,0 +1,118 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tests
+
+import (
+       "sync/atomic"
+       "testing"
+       "time"
+
+       "github.com/apache/pulsar-client-go/pulsar"
+       "github.com/apache/pulsar-client-go/pulsaradmin"
+       "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
+       "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
+       "github.com/google/uuid"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+func TestConsumerAckIDListDuringPartitionUpdate(t *testing.T) {
+       hookEntered := make(chan struct{})
+       releaseHook := make(chan struct{})
+       var hookCalls int32
+
+       topic := "persistent://public/default/ack-update-" + uuid.NewString()
+       pulsarAdmin, err := pulsaradmin.NewClient(&config.Config{WebServiceURL: 
testPulsar.AdminURL})
+       require.NoError(t, err)
+       topicName, err := utils.GetTopicName(topic)
+       require.NoError(t, err)
+       var lastCreateErr error
+       created := assert.Eventually(t, func() bool {
+               lastCreateErr = pulsarAdmin.Topics().Create(*topicName, 1)
+               return lastCreateErr == nil
+       }, 60*time.Second, time.Second)
+       require.Truef(t, created, "last error: %v", lastCreateErr)
+       require.NoError(t, lastCreateErr)
+
+       client, err := pulsar.NewClient(pulsar.ClientOptions{
+               URL:              testPulsar.BrokerURL,
+               OperationTimeout: 30 * time.Second,
+               FailureInjectHook: &blockingFailureInjectHook{
+                       beforeAssignPartitionConsumersFunc: func() {
+                               if atomic.AddInt32(&hookCalls, 1) == 1 {
+                                       return
+                               }
+                               close(hookEntered)
+                               <-releaseHook
+                       },
+               },
+       })
+       require.NoError(t, err)
+       defer client.Close()
+
+       var consumer pulsar.Consumer
+       require.Eventually(t, func() bool {
+               consumer, err = client.Subscribe(pulsar.ConsumerOptions{
+                       Topic:               topic,
+                       SubscriptionName:    "sub",
+                       AutoDiscoveryPeriod: time.Millisecond,
+               })
+               return err == nil
+       }, 30*time.Second, time.Second)
+       defer consumer.Close()
+
+       require.Eventually(t, func() bool {
+               err = pulsarAdmin.Topics().Update(*topicName, 2)
+               return err == nil
+       }, 30*time.Second, time.Second)
+       require.NoError(t, err)
+
+       select {
+       case <-hookEntered:
+       case <-time.After(10 * time.Second):
+               t.Fatal("timed out waiting for failure injection hook")
+       }
+
+       ackErrCh := make(chan error, 1)
+       go func() {
+               ackErrCh <- 
consumer.AckIDList([]pulsar.MessageID{pulsar.NewMessageID(1, 2, -1, 1)})
+       }()
+
+       select {
+       case err := <-ackErrCh:
+               t.Fatalf("AckIDList returned while partition consumer update 
was blocked: %v", err)
+       case <-time.After(50 * time.Millisecond):
+       }
+
+       close(releaseHook)
+
+       select {
+       case err := <-ackErrCh:
+               require.NoError(t, err)
+       case <-time.After(10 * time.Second):
+               t.Fatal("timed out waiting for AckIDList after partition 
consumer update was released")
+       }
+}
+
+type blockingFailureInjectHook struct {
+       beforeAssignPartitionConsumersFunc func()
+}
+
+func (h *blockingFailureInjectHook) BeforeAssignPartitionConsumers() {
+       h.beforeAssignPartitionConsumersFunc()
+}
diff --git a/tests/pulsar_container_test.go b/tests/pulsar_container_test.go
new file mode 100644
index 00000000..9e10941b
--- /dev/null
+++ b/tests/pulsar_container_test.go
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tests
+
+import (
+       "context"
+       "fmt"
+       "io"
+       "os"
+       "testing"
+       "time"
+
+       "github.com/testcontainers/testcontainers-go"
+       "github.com/testcontainers/testcontainers-go/wait"
+)
+
+type pulsarTestContainer struct {
+       testcontainers.Container
+       BrokerURL string
+       AdminURL  string
+}
+
+var testPulsar *pulsarTestContainer
+
+func TestMain(m *testing.M) {
+       _ = os.Setenv("NO_PROXY", "localhost,127.0.0.1,0.0.0.0")
+       _ = os.Setenv("no_proxy", "localhost,127.0.0.1,0.0.0.0")
+
+       ctx := context.Background()
+       var err error
+       testPulsar, err = startPulsarContainer(ctx)
+       if err != nil {
+               fmt.Fprintf(os.Stderr, "failed to start Pulsar container: 
%v\n", err)
+               os.Exit(1)
+       }
+
+       code := m.Run()
+       if err := testPulsar.Terminate(ctx); err != nil {
+               fmt.Fprintf(os.Stderr, "failed to terminate Pulsar container: 
%v\n", err)
+               if code == 0 {
+                       code = 1
+               }
+       }
+       os.Exit(code)
+}
+
+func startPulsarContainer(ctx context.Context) (*pulsarTestContainer, error) {
+       req := testcontainers.ContainerRequest{
+               Image:        pulsarTestImage(),
+               ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+               WaitingFor: wait.ForHTTP("/admin/v2/clusters").
+                       WithPort("8080/tcp").
+                       WithResponseMatcher(func(body io.Reader) bool {
+                               respBytes, _ := io.ReadAll(body)
+                               return string(respBytes) == `["standalone"]`
+                       }).
+                       WithStartupTimeout(2 * time.Minute),
+               Cmd: []string{"bin/pulsar", "standalone", "-nfw", 
"--advertised-address", "localhost"},
+       }
+       container, err := testcontainers.GenericContainer(ctx, 
testcontainers.GenericContainerRequest{
+               ContainerRequest: req,
+               Started:          true,
+       })
+       if err != nil {
+               return nil, err
+       }
+
+       brokerURL, err := container.PortEndpoint(ctx, "6650", "pulsar")
+       if err != nil {
+               _ = container.Terminate(ctx)
+               return nil, err
+       }
+       adminURL, err := container.PortEndpoint(ctx, "8080", "http")
+       if err != nil {
+               _ = container.Terminate(ctx)
+               return nil, err
+       }
+
+       return &pulsarTestContainer{
+               Container: container,
+               BrokerURL: brokerURL,
+               AdminURL:  adminURL,
+       }, nil
+}
+
+func pulsarTestImage() string {
+       image := os.Getenv("PULSAR_IMAGE")
+       if image == "" {
+               image = "apachepulsar/pulsar:latest"
+       }
+       return image
+}

Reply via email to