Copilot commented on code in PR #1490:
URL: https://github.com/apache/pulsar-client-go/pull/1490#discussion_r3257204758
##########
pulsar/consumer.go:
##########
@@ -240,6 +240,11 @@ type ConsumerOptions struct {
Schema Schema
// MaxReconnectToBroker sets the maximum retry number of
reconnectToBroker. (default: ultimate)
+ // When the retry budget is exhausted, or when the broker reports a
non-retriable error
+ // (e.g. AuthorizationError, TopicNotFound, TopicTerminated,
IncompatibleSchema), the
+ // consumer is closed. Applications can observe the close — and recover
the cause as the
+ // err argument — by registering a ConsumerInterceptor that also
implements
+ // ConsumerCloseInterceptor.
MaxReconnectToBroker *uint
Review Comment:
The PR description states this change adds two opt-in fields
(`MaxReconnectToBrokerListener` and `CloseConsumerOnMaxReconnectToBroker`) with
"no behaviour change for existing consumers." The actual diff diverges
significantly: instead, a new `ConsumerCloseInterceptor` is introduced and the
consumer is now unconditionally closed when `MaxReconnectToBroker` is exhausted
or when a non-retriable broker error is observed. Users who previously set
`MaxReconnectToBroker` relied on the silent-failure behavior (consumer remains
alive); with this PR their consumer will now be force-closed once retries are
exhausted. Either the PR description should be updated to reflect that this is
an intentional behavior change (and called out in CHANGELOG/release notes), or
the auto-close path should be made opt-in as the description promises.
##########
pulsar/consumer_partition.go:
##########
@@ -93,6 +93,56 @@ const (
noMessageEntry = -1
)
+// Broker error markers. When the broker reports any of these in response
+// to a Subscribe command, retrying will not recover — the consumer must give
up and notify
+// the application. errMsgTopicNotFound and errMsgTopicTerminated are reused
from the producer
+// path; the rest are consumer-specific.
+const (
+ errMsgConsumerSubscriptionNotFound = "SubscriptionNotFound"
+ errMsgConsumerAuthorizationError = "AuthorizationError"
+ errMsgConsumerBusy = "ConsumerBusy"
+ errMsgConsumerInvalidTopicName = "InvalidTopicName"
+ errMsgConsumerIncompatibleSchema = "IncompatibleSchema"
+ errMsgConsumerAssignError = "ConsumerAssignError"
+ errMsgConsumerNotAllowedError = "NotAllowedError"
+)
+
+// nonRetriableSubscribeErrorMarkers is the full set of broker error names
that should
+// terminate the reconnect loop immediately. Detection is by substring match
on the
+// error message, since the broker error arrives wire-formatted as
"<ServerError>: <msg>"
+// (see grabConn -> BaseCommand_ERROR handling).
+var nonRetriableSubscribeErrorMarkers = []string{
+ errMsgTopicNotFound,
+ errMsgTopicTerminated,
+ errMsgConsumerSubscriptionNotFound,
+ errMsgConsumerAuthorizationError,
+ errMsgConsumerBusy,
+ errMsgConsumerInvalidTopicName,
+ errMsgConsumerIncompatibleSchema,
+ errMsgConsumerAssignError,
+ errMsgConsumerNotAllowedError,
+}
+
+func isNonRetriableSubscribeError(err error) bool {
+ if err == nil {
+ return false
+ }
+ msg := err.Error()
+ for _, marker := range nonRetriableSubscribeErrorMarkers {
+ if strings.Contains(msg, marker) {
+ return true
+ }
+ }
+ return false
+}
Review Comment:
Detecting non-retriable broker errors by substring matching against
`err.Error()` is fragile. The broker wire error format is "<ServerError>:
<msg>", but the `<msg>` portion is broker-supplied and may legitimately contain
any of these substrings (e.g. a message that mentions "NotAllowedError" or
"ConsumerBusy" while reporting an unrelated, retriable condition would now
cause the consumer to give up and be closed). Where possible, prefer matching
against the typed `ServerError` value returned by `BaseCommand_ERROR` handling
(or check by error type/wrapping) rather than string-contains over the full
message. At minimum, anchor the match to the leading `"<Name>:"` prefix so that
occurrences of the same token in a free-form message body don't trigger a false
positive.
##########
pulsar/consumer_interceptor.go:
##########
@@ -28,6 +28,15 @@ type ConsumerInterceptor interface {
OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID)
}
+// ConsumerCloseInterceptor is an optional interface that a ConsumerInterceptor
+// may also implement to be notified once the consumer has been closed. The
hook
+// fires exactly once per consumer. When err is nil, the close was initiated by
+// the user; a non-nil err carries the cause that triggered an internal close
+// (for example, exhausting MaxReconnectToBroker or a non-retriable broker
error).
Review Comment:
The new `ConsumerCloseInterceptor.OnConsumerClose` hook is invoked only in
`consumer.closeWithCause` and `zeroQueueConsumer.closeWithCause`. The other
public `Consumer` implementations — `multiTopicConsumer.Close`
(consumer_multitopic.go:333) and `regexConsumer.Close` (consumer_regex.go:316)
— do not call `Interceptors.OnConsumerClose`, so an application that uses
`client.Subscribe` with multiple topics or `TopicsPattern` will never receive
the close callback documented here. Either invoke
`Interceptors.OnConsumerClose` from these Close paths as well, or document the
limitation explicitly in `ConsumerCloseInterceptor` and on
`MaxReconnectToBroker`.
##########
pulsar/consumer_partition.go:
##########
@@ -2088,19 +2151,20 @@ func (pc *partitionConsumer)
reconnectToBroker(connectionClosed *connectionClose
return struct{}{}, nil
}
pc.log.WithError(err).Error("Failed to create consumer at
reconnect")
- errMsg := err.Error()
- if strings.Contains(errMsg, errMsgTopicNotFound) {
- // when topic is deleted, we should give up
reconnection.
- pc.log.Warn("Topic Not Found.")
+ if isNonRetriableSubscribeError(err) {
+ pc.log.WithError(err).Warn("Non-retriable error during
reconnect, giving up")
+ notifyReconnectGiveUp(err)
return struct{}{}, nil
}
if maxRetry > 0 {
maxRetry--
}
pc.metrics.ConsumersReconnectFailure.Inc()
- if maxRetry == 0 || bo.IsMaxBackoffReached() {
+ if maxRetry == 0 {
pc.metrics.ConsumersReconnectMaxRetry.Inc()
Review Comment:
Previously, `ConsumersReconnectMaxRetry` was incremented in two cases:
`maxRetry == 0` OR `bo.IsMaxBackoffReached()`. After this change the
`IsMaxBackoffReached()` branch is removed, so any user dashboard/alert that
counts this metric to detect backoff-exhausted reconnect attempts (e.g., the
common case where `MaxReconnectToBroker` is unset and the only way to detect
"we've been retrying forever at the ceiling" was via this metric) will silently
stop firing. This is a behavior change worth calling out, and ideally the
metric should still increment whenever the loop is in the give-up condition
that previously incremented it.
##########
pulsar/consumer_test.go:
##########
@@ -5921,3 +5921,139 @@ func TestSelectConnectionForSameConsumer(t *testing.T) {
"The consumer uses a different connection when
reconnecting")
}
}
+
+// closeInterceptor captures the (consumer, err) pair delivered to
+// ConsumerCloseInterceptor.OnConsumerClose and signals via fired.
+type closeInterceptor struct {
+ fired chan struct{}
+ consumer Consumer
+ err error
+ once sync.Once
+}
+
+func (c *closeInterceptor) BeforeConsume(_ ConsumerMessage) {}
+func (c *closeInterceptor) OnAcknowledge(_ Consumer, _ MessageID) {}
+func (c *closeInterceptor) OnNegativeAcksSend(_ Consumer, _ []MessageID) {}
+func (c *closeInterceptor) OnConsumerClose(consumer Consumer, err error) {
+ c.once.Do(func() {
+ c.consumer = consumer
+ c.err = err
+ close(c.fired)
+ })
+}
+
+func TestConsumerOnCloseInterceptorOnMaxReconnect(t *testing.T) {
+ req := testcontainers.ContainerRequest{
+ Image: getPulsarTestImage(),
+ ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+ WaitingFor: wait.ForExposedPort(),
+ Cmd: []string{"bin/pulsar", "standalone", "-nfw",
"--advertised-address", "localhost"},
+ }
+ c, err := testcontainers.GenericContainer(context.Background(),
testcontainers.GenericContainerRequest{
+ ContainerRequest: req,
+ Started: true,
+ })
+ require.NoError(t, err)
+ t.Cleanup(func() {
+ if err := c.Terminate(context.Background()); err != nil {
+ t.Logf("container terminate (cleanup) returned: %v",
err)
+ }
+ })
+ endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
+ require.NoError(t, err)
+
+ pulsarClient, err := NewClient(ClientOptions{
+ URL: endpoint,
+ ConnectionTimeout: 3 * time.Second,
+ OperationTimeout: 5 * time.Second,
+ })
+ require.NoError(t, err)
+ defer pulsarClient.Close()
+
+ maxRetry := uint(1)
+ interceptor := &closeInterceptor{fired: make(chan struct{})}
+
+ topic := newTopicName()
+ var testConsumer Consumer
+ require.Eventually(t, func() bool {
+ testConsumer, err = pulsarClient.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "test-on-close-interceptor",
+ MaxReconnectToBroker: &maxRetry,
+ BackOffPolicyFunc: func() backoff.Policy {
+ return
newTestBackoffPolicy(100*time.Millisecond, 1*time.Second)
+ },
+ Interceptors: ConsumerInterceptors{interceptor},
+ })
+ return err == nil
+ }, 30*time.Second, 1*time.Second)
+ defer testConsumer.Close()
+
+ require.NoError(t, c.Terminate(context.Background()))
+
+ select {
+ case <-interceptor.fired:
+ case <-time.After(30 * time.Second):
+ t.Fatal("OnConsumerClose was not called within timeout")
+ }
+
+ assert.NotNil(t, interceptor.err, "interceptor should receive the cause
of the close")
+ assert.Equal(t, testConsumer, interceptor.consumer, "interceptor should
receive the parent consumer")
+
+ pc := testConsumer.(*consumer).consumers[0]
+ require.Eventually(t, func() bool {
+ return pc.getConsumerState() == consumerClosed
+ }, 30*time.Second, 100*time.Millisecond, "consumer should be closed
after exhausting max reconnect retries")
+}
Review Comment:
`TestConsumerOnCloseInterceptorOnMaxReconnect` starts a real
`testcontainers` Pulsar standalone and shuts it down mid-test to drive the
reconnect path. This makes the test slow (multiple 30s timeouts) and dependent
on Docker being available, while the same end-to-end behavior (listener fires
once and consumer transitions to `consumerClosed` on max-retry exhaustion) can
be exercised with a unit-level test that constructs `partitionConsumerOpts`
directly — as the PR description itself acknowledges ("Unit-level verification
can be done by constructing a `partitionConsumerOpts` directly..."). Consider
replacing or complementing the container-based test with a faster unit test so
the new behavior is covered by CI without requiring a live broker.
##########
pulsar/consumer_test.go:
##########
@@ -5921,3 +5921,139 @@ func TestSelectConnectionForSameConsumer(t *testing.T) {
"The consumer uses a different connection when
reconnecting")
}
}
+
+// closeInterceptor captures the (consumer, err) pair delivered to
+// ConsumerCloseInterceptor.OnConsumerClose and signals via fired.
+type closeInterceptor struct {
+ fired chan struct{}
+ consumer Consumer
+ err error
+ once sync.Once
+}
+
+func (c *closeInterceptor) BeforeConsume(_ ConsumerMessage) {}
+func (c *closeInterceptor) OnAcknowledge(_ Consumer, _ MessageID) {}
+func (c *closeInterceptor) OnNegativeAcksSend(_ Consumer, _ []MessageID) {}
+func (c *closeInterceptor) OnConsumerClose(consumer Consumer, err error) {
+ c.once.Do(func() {
+ c.consumer = consumer
+ c.err = err
+ close(c.fired)
+ })
+}
+
+func TestConsumerOnCloseInterceptorOnMaxReconnect(t *testing.T) {
+ req := testcontainers.ContainerRequest{
+ Image: getPulsarTestImage(),
+ ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+ WaitingFor: wait.ForExposedPort(),
+ Cmd: []string{"bin/pulsar", "standalone", "-nfw",
"--advertised-address", "localhost"},
+ }
+ c, err := testcontainers.GenericContainer(context.Background(),
testcontainers.GenericContainerRequest{
+ ContainerRequest: req,
+ Started: true,
+ })
+ require.NoError(t, err)
+ t.Cleanup(func() {
+ if err := c.Terminate(context.Background()); err != nil {
+ t.Logf("container terminate (cleanup) returned: %v",
err)
+ }
+ })
+ endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
+ require.NoError(t, err)
+
+ pulsarClient, err := NewClient(ClientOptions{
+ URL: endpoint,
+ ConnectionTimeout: 3 * time.Second,
+ OperationTimeout: 5 * time.Second,
+ })
+ require.NoError(t, err)
+ defer pulsarClient.Close()
+
+ maxRetry := uint(1)
+ interceptor := &closeInterceptor{fired: make(chan struct{})}
+
+ topic := newTopicName()
+ var testConsumer Consumer
+ require.Eventually(t, func() bool {
+ testConsumer, err = pulsarClient.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "test-on-close-interceptor",
+ MaxReconnectToBroker: &maxRetry,
+ BackOffPolicyFunc: func() backoff.Policy {
+ return
newTestBackoffPolicy(100*time.Millisecond, 1*time.Second)
+ },
+ Interceptors: ConsumerInterceptors{interceptor},
+ })
+ return err == nil
+ }, 30*time.Second, 1*time.Second)
+ defer testConsumer.Close()
+
+ require.NoError(t, c.Terminate(context.Background()))
+
+ select {
+ case <-interceptor.fired:
+ case <-time.After(30 * time.Second):
+ t.Fatal("OnConsumerClose was not called within timeout")
+ }
+
+ assert.NotNil(t, interceptor.err, "interceptor should receive the cause
of the close")
+ assert.Equal(t, testConsumer, interceptor.consumer, "interceptor should
receive the parent consumer")
+
+ pc := testConsumer.(*consumer).consumers[0]
+ require.Eventually(t, func() bool {
+ return pc.getConsumerState() == consumerClosed
+ }, 30*time.Second, 100*time.Millisecond, "consumer should be closed
after exhausting max reconnect retries")
+}
+
+func TestConsumerOnCloseInterceptorOnUserClose(t *testing.T) {
+ client, err := NewClient(ClientOptions{URL: serviceURL})
+ require.NoError(t, err)
+ defer client.Close()
+
+ interceptor := &closeInterceptor{fired: make(chan struct{})}
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: newTopicName(),
+ SubscriptionName: "test-on-close-user",
+ Interceptors: ConsumerInterceptors{interceptor},
+ })
+ require.NoError(t, err)
+
+ consumer.Close()
+
+ select {
+ case <-interceptor.fired:
+ case <-time.After(5 * time.Second):
+ t.Fatal("OnConsumerClose was not called within timeout")
+ }
+
+ assert.Nil(t, interceptor.err, "user-initiated close should report nil
cause")
+ assert.Equal(t, consumer, interceptor.consumer)
+}
+
+func TestIsNonRetriableSubscribeError(t *testing.T) {
+ cases := []struct {
+ name string
+ err error
+ want bool
+ }{
+ {"nil", nil, false},
+ {"topic not found", errors.New("TopicNotFound: topic does not
exist"), true},
+ {"topic terminated", errors.New("TopicTerminatedError: topic
was terminated"), true},
+ {"subscription not found", errors.New("SubscriptionNotFound:
sub does not exist"), true},
+ {"authorization", errors.New("AuthorizationError: not
authorized"), true},
+ {"consumer busy", errors.New("ConsumerBusy: another consumer
attached"), true},
+ {"invalid topic name", errors.New("InvalidTopicName: bad
name"), true},
+ {"incompatible schema", errors.New("IncompatibleSchema: schema
mismatch"), true},
+ {"consumer assign error", errors.New("ConsumerAssignError:
dispatcher assign failed"), true},
+ {"not allowed", errors.New("NotAllowedError: action not
permitted"), true},
+ {"service not ready (retriable)", errors.New("ServiceNotReady:
please retry"), false},
+ {"metadata error (retriable)", errors.New("MetadataError: zk
timeout"), false},
+ {"plain network error (retriable)", errors.New("dial tcp: i/o
timeout"), false},
+ }
+ for _, tc := range cases {
+ t.Run(tc.name, func(t *testing.T) {
+ assert.Equal(t, tc.want,
isNonRetriableSubscribeError(tc.err))
+ })
+ }
+}
Review Comment:
`TestIsNonRetriableSubscribeError` only exercises happy-path strings
("AuthorizationError: ...", etc.). It doesn't cover the substring-collision
edge cases that the implementation is vulnerable to (e.g. a retriable error
whose free-form message contains the literal text "AuthorizationError" or
"ConsumerBusy"). Adding such cases would make the test more meaningful and
would have caught false-positive matches.
##########
pulsar/consumer.go:
##########
@@ -240,6 +240,11 @@ type ConsumerOptions struct {
Schema Schema
// MaxReconnectToBroker sets the maximum retry number of
reconnectToBroker. (default: ultimate)
+ // When the retry budget is exhausted, or when the broker reports a
non-retriable error
+ // (e.g. AuthorizationError, TopicNotFound, TopicTerminated,
IncompatibleSchema), the
+ // consumer is closed. Applications can observe the close — and recover
the cause as the
+ // err argument — by registering a ConsumerInterceptor that also
implements
Review Comment:
This doc update describes a significant behavior change — the consumer is
now always closed when `MaxReconnectToBroker` is exhausted or a non-retriable
error is reported — but the change is buried in a paragraph about the recovery
callback. Consumers that previously set `MaxReconnectToBroker` to bound retries
(knowing the consumer would stay alive and stop retrying) will now find their
consumer auto-closed. Please call out the behavior change explicitly (e.g.
"Setting this option now also causes the consumer to be closed when the retry
budget is exhausted") and consider mentioning it in release notes/CHANGELOG.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]