This is an automated email from the ASF dual-hosted git repository.

nodece pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new a1765ef9 [fix][consumer] Add reconnect failure listener and auto-close 
on max retry exhaustion (#1490)
a1765ef9 is described below

commit a1765ef9c22a4cbd7b7a4704b6136e96860b3815
Author: Pavel Zeger <[email protected]>
AuthorDate: Wed May 20 10:30:19 2026 +0300

    [fix][consumer] Add reconnect failure listener and auto-close on max retry 
exhaustion (#1490)
    
    * [fix][consumer] Add reconnect failure listener and auto-close on max 
retry exhaustion
    
    * [fix][consumer] Fix consumer_partition.go review comment
    
    * [fix][consumer] Fix consumer.go review comment
    
    * [fix][consumer] Fix consumer_test.go review comment
    
    * [fix][consumer] Fix according to Java client logic
    
    * [fix][consumer] Fix according to Java client logic
    
    * Apply suggestions from code review
    
    Co-authored-by: Zixuan Liu <[email protected]>
    
    * [fix][consumer] Fix review comments
    
    ---------
    
    Co-authored-by: Zixuan Liu <[email protected]>
---
 pulsar/consumer.go             |   5 ++
 pulsar/consumer_impl.go        |   9 +++
 pulsar/consumer_interceptor.go |  17 ++++++
 pulsar/consumer_partition.go   |  74 ++++++++++++++++++++--
 pulsar/consumer_test.go        | 136 +++++++++++++++++++++++++++++++++++++++++
 pulsar/consumer_zero_queue.go  |   7 +++
 6 files changed, 243 insertions(+), 5 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index f7005543..9aeabb03 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -240,6 +240,11 @@ type ConsumerOptions struct {
        Schema Schema
 
        // MaxReconnectToBroker sets the maximum retry number of 
reconnectToBroker. (default: ultimate)
+       // When the retry budget is exhausted, or when the broker reports a 
non-retriable error
+       // (e.g. AuthorizationError, TopicNotFound, TopicTerminated, 
IncompatibleSchema), the
+       // consumer is closed. Applications can observe the close — and recover 
the cause as the
+       // err argument — by registering a ConsumerInterceptor that also 
implements
+       // ConsumerCloseInterceptor.
        MaxReconnectToBroker *uint
 
        // BackOffPolicyFunc parameterize the following options in the 
reconnection logic to
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 5579d374..d76108a2 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -713,6 +713,14 @@ func (c *consumer) NackID(msgID MessageID) {
 }
 
 func (c *consumer) Close() {
+       c.closeWithCause(nil)
+}
+
+// closeWithCause closes the consumer and notifies any ConsumerCloseInterceptor
+// with the supplied cause. The hook fires exactly once per consumer; the cause
+// is captured by the goroutine that wins closeOnce so concurrent callers 
cannot
+// race the value.
+func (c *consumer) closeWithCause(err error) {
        c.closeOnce.Do(func() {
                c.stopDiscovery()
 
@@ -734,6 +742,7 @@ func (c *consumer) Close() {
                c.rlq.close()
                c.metrics.ConsumersClosed.Inc()
                c.metrics.ConsumersPartitions.Sub(float64(len(c.consumers)))
+               c.options.Interceptors.OnConsumerClose(c, err)
        })
 }
 
diff --git a/pulsar/consumer_interceptor.go b/pulsar/consumer_interceptor.go
index db46b784..4ad6546c 100644
--- a/pulsar/consumer_interceptor.go
+++ b/pulsar/consumer_interceptor.go
@@ -28,6 +28,15 @@ type ConsumerInterceptor interface {
        OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID)
 }
 
+// ConsumerCloseInterceptor is an optional interface that a ConsumerInterceptor
+// may also implement to be notified once the consumer has been closed. The 
hook
+// fires exactly once per consumer. When err is nil, the close was initiated by
+// the user; a non-nil err carries the cause that triggered an internal close
+// (for example, exhausting MaxReconnectToBroker or a non-retriable broker 
error).
+type ConsumerCloseInterceptor interface {
+       OnConsumerClose(consumer Consumer, err error)
+}
+
 type ConsumerInterceptors []ConsumerInterceptor
 
 func (x ConsumerInterceptors) BeforeConsume(message ConsumerMessage) {
@@ -48,4 +57,12 @@ func (x ConsumerInterceptors) OnNegativeAcksSend(consumer 
Consumer, msgIDs []Mes
        }
 }
 
+func (x ConsumerInterceptors) OnConsumerClose(consumer Consumer, err error) {
+       for i := range x {
+               if c, ok := x[i].(ConsumerCloseInterceptor); ok {
+                       c.OnConsumerClose(consumer, err)
+               }
+       }
+}
+
 var defaultConsumerInterceptors = make(ConsumerInterceptors, 0)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 8c6d8916..d7aba524 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -93,6 +93,56 @@ const (
        noMessageEntry = -1
 )
 
+// Broker error markers. When the broker reports any of these in response
+// to a Subscribe command, retrying will not recover — the consumer must give 
up and notify
+// the application. errMsgTopicNotFound and errMsgTopicTerminated are reused 
from the producer
+// path; the rest are consumer-specific.
+const (
+       errMsgConsumerSubscriptionNotFound = "SubscriptionNotFound"
+       errMsgConsumerAuthorizationError   = "AuthorizationError"
+       errMsgConsumerBusy                 = "ConsumerBusy"
+       errMsgConsumerInvalidTopicName     = "InvalidTopicName"
+       errMsgConsumerIncompatibleSchema   = "IncompatibleSchema"
+       errMsgConsumerAssignError          = "ConsumerAssignError"
+       errMsgConsumerNotAllowedError      = "NotAllowedError"
+)
+
+// nonRetriableSubscribeErrorMarkers is the full set of broker error names 
that should
+// terminate the reconnect loop immediately. Detection is by substring match 
on the
+// error message, since the broker error arrives wire-formatted as 
"<ServerError>: <msg>"
+// (see grabConn -> BaseCommand_ERROR handling).
+var nonRetriableSubscribeErrorMarkers = []string{
+       errMsgTopicNotFound,
+       errMsgTopicTerminated,
+       errMsgConsumerSubscriptionNotFound,
+       errMsgConsumerAuthorizationError,
+       errMsgConsumerBusy,
+       errMsgConsumerInvalidTopicName,
+       errMsgConsumerIncompatibleSchema,
+       errMsgConsumerAssignError,
+       errMsgConsumerNotAllowedError,
+}
+
+func isNonRetriableSubscribeError(err error) bool {
+       if err == nil {
+               return false
+       }
+       msg := err.Error()
+       for _, marker := range nonRetriableSubscribeErrorMarkers {
+               if strings.Contains(msg, marker) {
+                       return true
+               }
+       }
+       return false
+}
+
+// causalCloser is implemented by Consumer wrappers that can record the reason
+// they were closed and forward it to a ConsumerCloseInterceptor. The reconnect
+// loop uses this to surface the underlying error when it gives up.
+type causalCloser interface {
+       closeWithCause(err error)
+}
+
 type partitionConsumerOpts struct {
        topic                       string
        consumerName                string
@@ -2062,6 +2112,19 @@ func (pc *partitionConsumer) 
reconnectToBroker(connectionClosed *connectionClose
                assignedBrokerURL = connectionClosed.assignedBrokerURL
        }
 
+       var giveUpNotified bool
+       notifyReconnectGiveUp := func(cause error) {
+               if giveUpNotified {
+                       return
+               }
+               giveUpNotified = true
+               if cc, ok := pc.parentConsumer.(causalCloser); ok {
+                       go cc.closeWithCause(cause)
+               } else {
+                       go pc.parentConsumer.Close()
+               }
+       }
+
        opFn := func() (struct{}, error) {
                if maxRetry == 0 {
                        return struct{}{}, nil
@@ -2088,10 +2151,9 @@ func (pc *partitionConsumer) 
reconnectToBroker(connectionClosed *connectionClose
                        return struct{}{}, nil
                }
                pc.log.WithError(err).Error("Failed to create consumer at 
reconnect")
-               errMsg := err.Error()
-               if strings.Contains(errMsg, errMsgTopicNotFound) {
-                       // when topic is deleted, we should give up 
reconnection.
-                       pc.log.Warn("Topic Not Found.")
+               if isNonRetriableSubscribeError(err) {
+                       pc.log.WithError(err).Warn("Non-retriable error during 
reconnect, giving up")
+                       notifyReconnectGiveUp(err)
                        return struct{}{}, nil
                }
 
@@ -2099,8 +2161,10 @@ func (pc *partitionConsumer) 
reconnectToBroker(connectionClosed *connectionClose
                        maxRetry--
                }
                pc.metrics.ConsumersReconnectFailure.Inc()
-               if maxRetry == 0 || bo.IsMaxBackoffReached() {
+               if maxRetry == 0 {
                        pc.metrics.ConsumersReconnectMaxRetry.Inc()
+                       notifyReconnectGiveUp(errors.New("max retry attempts 
reached for reconnecting to broker"))
+                       return struct{}{}, nil
                }
 
                return struct{}{}, err
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 4e2cbc4a..e1964287 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -5927,3 +5927,139 @@ func TestSelectConnectionForSameConsumer(t *testing.T) {
                        "The consumer uses a different connection when 
reconnecting")
        }
 }
+
+// closeInterceptor captures the (consumer, err) pair delivered to
+// ConsumerCloseInterceptor.OnConsumerClose and signals via fired.
+type closeInterceptor struct {
+       fired    chan struct{}
+       consumer Consumer
+       err      error
+       once     sync.Once
+}
+
+func (c *closeInterceptor) BeforeConsume(_ ConsumerMessage)              {}
+func (c *closeInterceptor) OnAcknowledge(_ Consumer, _ MessageID)        {}
+func (c *closeInterceptor) OnNegativeAcksSend(_ Consumer, _ []MessageID) {}
+func (c *closeInterceptor) OnConsumerClose(consumer Consumer, err error) {
+       c.once.Do(func() {
+               c.consumer = consumer
+               c.err = err
+               close(c.fired)
+       })
+}
+
+func TestConsumerOnCloseInterceptorOnMaxReconnect(t *testing.T) {
+       req := testcontainers.ContainerRequest{
+               Image:        getPulsarTestImage(),
+               ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+               WaitingFor:   wait.ForExposedPort(),
+               Cmd:          []string{"bin/pulsar", "standalone", "-nfw", 
"--advertised-address", "localhost"},
+       }
+       c, err := testcontainers.GenericContainer(context.Background(), 
testcontainers.GenericContainerRequest{
+               ContainerRequest: req,
+               Started:          true,
+       })
+       require.NoError(t, err)
+       t.Cleanup(func() {
+               if err := c.Terminate(context.Background()); err != nil {
+                       t.Logf("container terminate (cleanup) returned: %v", 
err)
+               }
+       })
+       endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
+       require.NoError(t, err)
+
+       pulsarClient, err := NewClient(ClientOptions{
+               URL:               endpoint,
+               ConnectionTimeout: 3 * time.Second,
+               OperationTimeout:  5 * time.Second,
+       })
+       require.NoError(t, err)
+       defer pulsarClient.Close()
+
+       maxRetry := uint(1)
+       interceptor := &closeInterceptor{fired: make(chan struct{})}
+
+       topic := newTopicName()
+       var testConsumer Consumer
+       require.Eventually(t, func() bool {
+               testConsumer, err = pulsarClient.Subscribe(ConsumerOptions{
+                       Topic:                topic,
+                       SubscriptionName:     "test-on-close-interceptor",
+                       MaxReconnectToBroker: &maxRetry,
+                       BackOffPolicyFunc: func() backoff.Policy {
+                               return 
newTestBackoffPolicy(100*time.Millisecond, 1*time.Second)
+                       },
+                       Interceptors: ConsumerInterceptors{interceptor},
+               })
+               return err == nil
+       }, 30*time.Second, 1*time.Second)
+       defer testConsumer.Close()
+
+       require.NoError(t, c.Terminate(context.Background()))
+
+       select {
+       case <-interceptor.fired:
+       case <-time.After(30 * time.Second):
+               t.Fatal("OnConsumerClose was not called within timeout")
+       }
+
+       assert.NotNil(t, interceptor.err, "interceptor should receive the cause 
of the close")
+       assert.Equal(t, testConsumer, interceptor.consumer, "interceptor should 
receive the parent consumer")
+
+       pc := testConsumer.(*consumer).consumers[0]
+       require.Eventually(t, func() bool {
+               return pc.getConsumerState() == consumerClosed
+       }, 30*time.Second, 100*time.Millisecond, "consumer should be closed 
after exhausting max reconnect retries")
+}
+
+func TestConsumerOnCloseInterceptorOnUserClose(t *testing.T) {
+       client, err := NewClient(ClientOptions{URL: serviceURL})
+       require.NoError(t, err)
+       defer client.Close()
+
+       interceptor := &closeInterceptor{fired: make(chan struct{})}
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            newTopicName(),
+               SubscriptionName: "test-on-close-user",
+               Interceptors:     ConsumerInterceptors{interceptor},
+       })
+       require.NoError(t, err)
+
+       consumer.Close()
+
+       select {
+       case <-interceptor.fired:
+       case <-time.After(5 * time.Second):
+               t.Fatal("OnConsumerClose was not called within timeout")
+       }
+
+       assert.Nil(t, interceptor.err, "user-initiated close should report nil 
cause")
+       assert.Equal(t, consumer, interceptor.consumer)
+}
+
+func TestIsNonRetriableSubscribeError(t *testing.T) {
+       cases := []struct {
+               name string
+               err  error
+               want bool
+       }{
+               {"nil", nil, false},
+               {"topic not found", errors.New("TopicNotFound: topic does not 
exist"), true},
+               {"topic terminated", errors.New("TopicTerminatedError: topic 
was terminated"), true},
+               {"subscription not found", errors.New("SubscriptionNotFound: 
sub does not exist"), true},
+               {"authorization", errors.New("AuthorizationError: not 
authorized"), true},
+               {"consumer busy", errors.New("ConsumerBusy: another consumer 
attached"), true},
+               {"invalid topic name", errors.New("InvalidTopicName: bad 
name"), true},
+               {"incompatible schema", errors.New("IncompatibleSchema: schema 
mismatch"), true},
+               {"consumer assign error", errors.New("ConsumerAssignError: 
dispatcher assign failed"), true},
+               {"not allowed", errors.New("NotAllowedError: action not 
permitted"), true},
+               {"service not ready (retriable)", errors.New("ServiceNotReady: 
please retry"), false},
+               {"metadata error (retriable)", errors.New("MetadataError: zk 
timeout"), false},
+               {"plain network error (retriable)", errors.New("dial tcp: i/o 
timeout"), false},
+       }
+       for _, tc := range cases {
+               t.Run(tc.name, func(t *testing.T) {
+                       assert.Equal(t, tc.want, 
isNonRetriableSubscribeError(tc.err))
+               })
+       }
+}
diff --git a/pulsar/consumer_zero_queue.go b/pulsar/consumer_zero_queue.go
index 97016c38..4978fae2 100644
--- a/pulsar/consumer_zero_queue.go
+++ b/pulsar/consumer_zero_queue.go
@@ -261,6 +261,12 @@ func (z *zeroQueueConsumer) NackID(msgID MessageID) {
 }
 
 func (z *zeroQueueConsumer) Close() {
+       z.closeWithCause(nil)
+}
+
+// closeWithCause closes the consumer and notifies any ConsumerCloseInterceptor
+// with the supplied cause. The hook fires exactly once per consumer.
+func (z *zeroQueueConsumer) closeWithCause(err error) {
        z.closeOnce.Do(func() {
                z.Lock()
                defer z.Unlock()
@@ -272,6 +278,7 @@ func (z *zeroQueueConsumer) Close() {
                z.rlq.close()
                z.metrics.ConsumersClosed.Inc()
                z.metrics.ConsumersPartitions.Sub(float64(1))
+               z.options.Interceptors.OnConsumerClose(z, err)
        })
 }
 

Reply via email to