This is an automated email from the ASF dual-hosted git repository.
nodece pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new a1765ef9 [fix][consumer] Add reconnect failure listener and auto-close
on max retry exhaustion (#1490)
a1765ef9 is described below
commit a1765ef9c22a4cbd7b7a4704b6136e96860b3815
Author: Pavel Zeger <[email protected]>
AuthorDate: Wed May 20 10:30:19 2026 +0300
[fix][consumer] Add reconnect failure listener and auto-close on max retry
exhaustion (#1490)
* [fix][consumer] Add reconnect failure listener and auto-close on max
retry exhaustion
* [fix][consumer] Fix consumer_partition.go review comment
* [fix][consumer] Fix consumer.go review comment
* [fix][consumer] Fix consumer_test.go review comment
* [fix][consumer] Fix according to Java client logic
* [fix][consumer] Fix according to Java client logic
* Apply suggestions from code review
Co-authored-by: Zixuan Liu <[email protected]>
* [fix][consumer] Fix review comments
---------
Co-authored-by: Zixuan Liu <[email protected]>
---
pulsar/consumer.go | 5 ++
pulsar/consumer_impl.go | 9 +++
pulsar/consumer_interceptor.go | 17 ++++++
pulsar/consumer_partition.go | 74 ++++++++++++++++++++--
pulsar/consumer_test.go | 136 +++++++++++++++++++++++++++++++++++++++++
pulsar/consumer_zero_queue.go | 7 +++
6 files changed, 243 insertions(+), 5 deletions(-)
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index f7005543..9aeabb03 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -240,6 +240,11 @@ type ConsumerOptions struct {
Schema Schema
// MaxReconnectToBroker sets the maximum retry number of
reconnectToBroker. (default: ultimate)
+ // When the retry budget is exhausted, or when the broker reports a
non-retriable error
+ // (e.g. AuthorizationError, TopicNotFound, TopicTerminated,
IncompatibleSchema), the
+ // consumer is closed. Applications can observe the close — and recover
the cause as the
+ // err argument — by registering a ConsumerInterceptor that also
implements
+ // ConsumerCloseInterceptor.
MaxReconnectToBroker *uint
// BackOffPolicyFunc parameterize the following options in the
reconnection logic to
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 5579d374..d76108a2 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -713,6 +713,14 @@ func (c *consumer) NackID(msgID MessageID) {
}
func (c *consumer) Close() {
+ c.closeWithCause(nil)
+}
+
+// closeWithCause closes the consumer and notifies any ConsumerCloseInterceptor
+// with the supplied cause. The hook fires exactly once per consumer; the cause
+// is captured by the goroutine that wins closeOnce so concurrent callers
cannot
+// race the value.
+func (c *consumer) closeWithCause(err error) {
c.closeOnce.Do(func() {
c.stopDiscovery()
@@ -734,6 +742,7 @@ func (c *consumer) Close() {
c.rlq.close()
c.metrics.ConsumersClosed.Inc()
c.metrics.ConsumersPartitions.Sub(float64(len(c.consumers)))
+ c.options.Interceptors.OnConsumerClose(c, err)
})
}
diff --git a/pulsar/consumer_interceptor.go b/pulsar/consumer_interceptor.go
index db46b784..4ad6546c 100644
--- a/pulsar/consumer_interceptor.go
+++ b/pulsar/consumer_interceptor.go
@@ -28,6 +28,15 @@ type ConsumerInterceptor interface {
OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID)
}
+// ConsumerCloseInterceptor is an optional interface that a ConsumerInterceptor
+// may also implement to be notified once the consumer has been closed. The
hook
+// fires exactly once per consumer. When err is nil, the close was initiated by
+// the user; a non-nil err carries the cause that triggered an internal close
+// (for example, exhausting MaxReconnectToBroker or a non-retriable broker
error).
+type ConsumerCloseInterceptor interface {
+ OnConsumerClose(consumer Consumer, err error)
+}
+
type ConsumerInterceptors []ConsumerInterceptor
func (x ConsumerInterceptors) BeforeConsume(message ConsumerMessage) {
@@ -48,4 +57,12 @@ func (x ConsumerInterceptors) OnNegativeAcksSend(consumer
Consumer, msgIDs []Mes
}
}
+func (x ConsumerInterceptors) OnConsumerClose(consumer Consumer, err error) {
+ for i := range x {
+ if c, ok := x[i].(ConsumerCloseInterceptor); ok {
+ c.OnConsumerClose(consumer, err)
+ }
+ }
+}
+
var defaultConsumerInterceptors = make(ConsumerInterceptors, 0)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 8c6d8916..d7aba524 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -93,6 +93,56 @@ const (
noMessageEntry = -1
)
+// Broker error markers. When the broker reports any of these in response
+// to a Subscribe command, retrying will not recover — the consumer must give
up and notify
+// the application. errMsgTopicNotFound and errMsgTopicTerminated are reused
from the producer
+// path; the rest are consumer-specific.
+const (
+ errMsgConsumerSubscriptionNotFound = "SubscriptionNotFound"
+ errMsgConsumerAuthorizationError = "AuthorizationError"
+ errMsgConsumerBusy = "ConsumerBusy"
+ errMsgConsumerInvalidTopicName = "InvalidTopicName"
+ errMsgConsumerIncompatibleSchema = "IncompatibleSchema"
+ errMsgConsumerAssignError = "ConsumerAssignError"
+ errMsgConsumerNotAllowedError = "NotAllowedError"
+)
+
+// nonRetriableSubscribeErrorMarkers is the full set of broker error names
that should
+// terminate the reconnect loop immediately. Detection is by substring match
on the
+// error message, since the broker error arrives wire-formatted as
"<ServerError>: <msg>"
+// (see grabConn -> BaseCommand_ERROR handling).
+var nonRetriableSubscribeErrorMarkers = []string{
+ errMsgTopicNotFound,
+ errMsgTopicTerminated,
+ errMsgConsumerSubscriptionNotFound,
+ errMsgConsumerAuthorizationError,
+ errMsgConsumerBusy,
+ errMsgConsumerInvalidTopicName,
+ errMsgConsumerIncompatibleSchema,
+ errMsgConsumerAssignError,
+ errMsgConsumerNotAllowedError,
+}
+
+func isNonRetriableSubscribeError(err error) bool {
+ if err == nil {
+ return false
+ }
+ msg := err.Error()
+ for _, marker := range nonRetriableSubscribeErrorMarkers {
+ if strings.Contains(msg, marker) {
+ return true
+ }
+ }
+ return false
+}
+
+// causalCloser is implemented by Consumer wrappers that can record the reason
+// they were closed and forward it to a ConsumerCloseInterceptor. The reconnect
+// loop uses this to surface the underlying error when it gives up.
+type causalCloser interface {
+ closeWithCause(err error)
+}
+
type partitionConsumerOpts struct {
topic string
consumerName string
@@ -2062,6 +2112,19 @@ func (pc *partitionConsumer)
reconnectToBroker(connectionClosed *connectionClose
assignedBrokerURL = connectionClosed.assignedBrokerURL
}
+ var giveUpNotified bool
+ notifyReconnectGiveUp := func(cause error) {
+ if giveUpNotified {
+ return
+ }
+ giveUpNotified = true
+ if cc, ok := pc.parentConsumer.(causalCloser); ok {
+ go cc.closeWithCause(cause)
+ } else {
+ go pc.parentConsumer.Close()
+ }
+ }
+
opFn := func() (struct{}, error) {
if maxRetry == 0 {
return struct{}{}, nil
@@ -2088,10 +2151,9 @@ func (pc *partitionConsumer)
reconnectToBroker(connectionClosed *connectionClose
return struct{}{}, nil
}
pc.log.WithError(err).Error("Failed to create consumer at
reconnect")
- errMsg := err.Error()
- if strings.Contains(errMsg, errMsgTopicNotFound) {
- // when topic is deleted, we should give up
reconnection.
- pc.log.Warn("Topic Not Found.")
+ if isNonRetriableSubscribeError(err) {
+ pc.log.WithError(err).Warn("Non-retriable error during
reconnect, giving up")
+ notifyReconnectGiveUp(err)
return struct{}{}, nil
}
@@ -2099,8 +2161,10 @@ func (pc *partitionConsumer)
reconnectToBroker(connectionClosed *connectionClose
maxRetry--
}
pc.metrics.ConsumersReconnectFailure.Inc()
- if maxRetry == 0 || bo.IsMaxBackoffReached() {
+ if maxRetry == 0 {
pc.metrics.ConsumersReconnectMaxRetry.Inc()
+ notifyReconnectGiveUp(errors.New("max retry attempts
reached for reconnecting to broker"))
+ return struct{}{}, nil
}
return struct{}{}, err
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 4e2cbc4a..e1964287 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -5927,3 +5927,139 @@ func TestSelectConnectionForSameConsumer(t *testing.T) {
"The consumer uses a different connection when
reconnecting")
}
}
+
+// closeInterceptor captures the (consumer, err) pair delivered to
+// ConsumerCloseInterceptor.OnConsumerClose and signals via fired.
+type closeInterceptor struct {
+ fired chan struct{}
+ consumer Consumer
+ err error
+ once sync.Once
+}
+
+func (c *closeInterceptor) BeforeConsume(_ ConsumerMessage) {}
+func (c *closeInterceptor) OnAcknowledge(_ Consumer, _ MessageID) {}
+func (c *closeInterceptor) OnNegativeAcksSend(_ Consumer, _ []MessageID) {}
+func (c *closeInterceptor) OnConsumerClose(consumer Consumer, err error) {
+ c.once.Do(func() {
+ c.consumer = consumer
+ c.err = err
+ close(c.fired)
+ })
+}
+
+func TestConsumerOnCloseInterceptorOnMaxReconnect(t *testing.T) {
+ req := testcontainers.ContainerRequest{
+ Image: getPulsarTestImage(),
+ ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+ WaitingFor: wait.ForExposedPort(),
+ Cmd: []string{"bin/pulsar", "standalone", "-nfw",
"--advertised-address", "localhost"},
+ }
+ c, err := testcontainers.GenericContainer(context.Background(),
testcontainers.GenericContainerRequest{
+ ContainerRequest: req,
+ Started: true,
+ })
+ require.NoError(t, err)
+ t.Cleanup(func() {
+ if err := c.Terminate(context.Background()); err != nil {
+ t.Logf("container terminate (cleanup) returned: %v",
err)
+ }
+ })
+ endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
+ require.NoError(t, err)
+
+ pulsarClient, err := NewClient(ClientOptions{
+ URL: endpoint,
+ ConnectionTimeout: 3 * time.Second,
+ OperationTimeout: 5 * time.Second,
+ })
+ require.NoError(t, err)
+ defer pulsarClient.Close()
+
+ maxRetry := uint(1)
+ interceptor := &closeInterceptor{fired: make(chan struct{})}
+
+ topic := newTopicName()
+ var testConsumer Consumer
+ require.Eventually(t, func() bool {
+ testConsumer, err = pulsarClient.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "test-on-close-interceptor",
+ MaxReconnectToBroker: &maxRetry,
+ BackOffPolicyFunc: func() backoff.Policy {
+ return
newTestBackoffPolicy(100*time.Millisecond, 1*time.Second)
+ },
+ Interceptors: ConsumerInterceptors{interceptor},
+ })
+ return err == nil
+ }, 30*time.Second, 1*time.Second)
+ defer testConsumer.Close()
+
+ require.NoError(t, c.Terminate(context.Background()))
+
+ select {
+ case <-interceptor.fired:
+ case <-time.After(30 * time.Second):
+ t.Fatal("OnConsumerClose was not called within timeout")
+ }
+
+ assert.NotNil(t, interceptor.err, "interceptor should receive the cause
of the close")
+ assert.Equal(t, testConsumer, interceptor.consumer, "interceptor should
receive the parent consumer")
+
+ pc := testConsumer.(*consumer).consumers[0]
+ require.Eventually(t, func() bool {
+ return pc.getConsumerState() == consumerClosed
+ }, 30*time.Second, 100*time.Millisecond, "consumer should be closed
after exhausting max reconnect retries")
+}
+
+func TestConsumerOnCloseInterceptorOnUserClose(t *testing.T) {
+ client, err := NewClient(ClientOptions{URL: serviceURL})
+ require.NoError(t, err)
+ defer client.Close()
+
+ interceptor := &closeInterceptor{fired: make(chan struct{})}
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: newTopicName(),
+ SubscriptionName: "test-on-close-user",
+ Interceptors: ConsumerInterceptors{interceptor},
+ })
+ require.NoError(t, err)
+
+ consumer.Close()
+
+ select {
+ case <-interceptor.fired:
+ case <-time.After(5 * time.Second):
+ t.Fatal("OnConsumerClose was not called within timeout")
+ }
+
+ assert.Nil(t, interceptor.err, "user-initiated close should report nil
cause")
+ assert.Equal(t, consumer, interceptor.consumer)
+}
+
+func TestIsNonRetriableSubscribeError(t *testing.T) {
+ cases := []struct {
+ name string
+ err error
+ want bool
+ }{
+ {"nil", nil, false},
+ {"topic not found", errors.New("TopicNotFound: topic does not
exist"), true},
+ {"topic terminated", errors.New("TopicTerminatedError: topic
was terminated"), true},
+ {"subscription not found", errors.New("SubscriptionNotFound:
sub does not exist"), true},
+ {"authorization", errors.New("AuthorizationError: not
authorized"), true},
+ {"consumer busy", errors.New("ConsumerBusy: another consumer
attached"), true},
+ {"invalid topic name", errors.New("InvalidTopicName: bad
name"), true},
+ {"incompatible schema", errors.New("IncompatibleSchema: schema
mismatch"), true},
+ {"consumer assign error", errors.New("ConsumerAssignError:
dispatcher assign failed"), true},
+ {"not allowed", errors.New("NotAllowedError: action not
permitted"), true},
+ {"service not ready (retriable)", errors.New("ServiceNotReady:
please retry"), false},
+ {"metadata error (retriable)", errors.New("MetadataError: zk
timeout"), false},
+ {"plain network error (retriable)", errors.New("dial tcp: i/o
timeout"), false},
+ }
+ for _, tc := range cases {
+ t.Run(tc.name, func(t *testing.T) {
+ assert.Equal(t, tc.want,
isNonRetriableSubscribeError(tc.err))
+ })
+ }
+}
diff --git a/pulsar/consumer_zero_queue.go b/pulsar/consumer_zero_queue.go
index 97016c38..4978fae2 100644
--- a/pulsar/consumer_zero_queue.go
+++ b/pulsar/consumer_zero_queue.go
@@ -261,6 +261,12 @@ func (z *zeroQueueConsumer) NackID(msgID MessageID) {
}
func (z *zeroQueueConsumer) Close() {
+ z.closeWithCause(nil)
+}
+
+// closeWithCause closes the consumer and notifies any ConsumerCloseInterceptor
+// with the supplied cause. The hook fires exactly once per consumer.
+func (z *zeroQueueConsumer) closeWithCause(err error) {
z.closeOnce.Do(func() {
z.Lock()
defer z.Unlock()
@@ -272,6 +278,7 @@ func (z *zeroQueueConsumer) Close() {
z.rlq.close()
z.metrics.ConsumersClosed.Inc()
z.metrics.ConsumersPartitions.Sub(float64(1))
+ z.options.Interceptors.OnConsumerClose(z, err)
})
}