Copilot commented on code in PR #1490:
URL: https://github.com/apache/pulsar-client-go/pull/1490#discussion_r3257204758


##########
pulsar/consumer.go:
##########
@@ -240,6 +240,11 @@ type ConsumerOptions struct {
        Schema Schema
 
        // MaxReconnectToBroker sets the maximum retry number of 
reconnectToBroker. (default: ultimate)
+       // When the retry budget is exhausted, or when the broker reports a 
non-retriable error
+       // (e.g. AuthorizationError, TopicNotFound, TopicTerminated, 
IncompatibleSchema), the
+       // consumer is closed. Applications can observe the close — and recover 
the cause as the
+       // err argument — by registering a ConsumerInterceptor that also 
implements
+       // ConsumerCloseInterceptor.
        MaxReconnectToBroker *uint

Review Comment:
   The PR description states this change adds two opt-in fields 
(`MaxReconnectToBrokerListener` and `CloseConsumerOnMaxReconnectToBroker`) with 
"no behaviour change for existing consumers." The actual diff diverges 
significantly: instead, a new `ConsumerCloseInterceptor` is introduced and the 
consumer is now unconditionally closed when `MaxReconnectToBroker` is exhausted 
or when a non-retriable broker error is observed. Users who previously set 
`MaxReconnectToBroker` relied on the silent-failure behavior (consumer remains 
alive); with this PR their consumer will now be force-closed once retries are 
exhausted. Either the PR description should be updated to reflect that this is 
an intentional behavior change (and called out in CHANGELOG/release notes), or 
the auto-close path should be made opt-in as the description promises.



##########
pulsar/consumer_partition.go:
##########
@@ -93,6 +93,56 @@ const (
        noMessageEntry = -1
 )
 
+// Broker error markers. When the broker reports any of these in response
+// to a Subscribe command, retrying will not recover — the consumer must give 
up and notify
+// the application. errMsgTopicNotFound and errMsgTopicTerminated are reused 
from the producer
+// path; the rest are consumer-specific.
+const (
+       errMsgConsumerSubscriptionNotFound = "SubscriptionNotFound"
+       errMsgConsumerAuthorizationError   = "AuthorizationError"
+       errMsgConsumerBusy                 = "ConsumerBusy"
+       errMsgConsumerInvalidTopicName     = "InvalidTopicName"
+       errMsgConsumerIncompatibleSchema   = "IncompatibleSchema"
+       errMsgConsumerAssignError          = "ConsumerAssignError"
+       errMsgConsumerNotAllowedError      = "NotAllowedError"
+)
+
+// nonRetriableSubscribeErrorMarkers is the full set of broker error names 
that should
+// terminate the reconnect loop immediately. Detection is by substring match 
on the
+// error message, since the broker error arrives wire-formatted as 
"<ServerError>: <msg>"
+// (see grabConn -> BaseCommand_ERROR handling).
+var nonRetriableSubscribeErrorMarkers = []string{
+       errMsgTopicNotFound,
+       errMsgTopicTerminated,
+       errMsgConsumerSubscriptionNotFound,
+       errMsgConsumerAuthorizationError,
+       errMsgConsumerBusy,
+       errMsgConsumerInvalidTopicName,
+       errMsgConsumerIncompatibleSchema,
+       errMsgConsumerAssignError,
+       errMsgConsumerNotAllowedError,
+}
+
+func isNonRetriableSubscribeError(err error) bool {
+       if err == nil {
+               return false
+       }
+       msg := err.Error()
+       for _, marker := range nonRetriableSubscribeErrorMarkers {
+               if strings.Contains(msg, marker) {
+                       return true
+               }
+       }
+       return false
+}

Review Comment:
   Detecting non-retriable broker errors by substring matching against 
`err.Error()` is fragile. The broker wire error format is "<ServerError>: 
<msg>", but the `<msg>` portion is broker-supplied and may legitimately contain 
any of these substrings (e.g. a message that mentions "NotAllowedError" or 
"ConsumerBusy" while reporting an unrelated, retriable condition would now 
cause the consumer to give up and be closed). Where possible, prefer matching 
against the typed `ServerError` value returned by `BaseCommand_ERROR` handling 
(or check by error type/wrapping) rather than string-contains over the full 
message. At minimum, anchor the match to the leading `"<Name>:"` prefix so that 
occurrences of the same token in a free-form message body don't trigger a false 
positive.



##########
pulsar/consumer_interceptor.go:
##########
@@ -28,6 +28,15 @@ type ConsumerInterceptor interface {
        OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID)
 }
 
+// ConsumerCloseInterceptor is an optional interface that a ConsumerInterceptor
+// may also implement to be notified once the consumer has been closed. The 
hook
+// fires exactly once per consumer. When err is nil, the close was initiated by
+// the user; a non-nil err carries the cause that triggered an internal close
+// (for example, exhausting MaxReconnectToBroker or a non-retriable broker 
error).

Review Comment:
   The new `ConsumerCloseInterceptor.OnConsumerClose` hook is invoked only in 
`consumer.closeWithCause` and `zeroQueueConsumer.closeWithCause`. The other 
public `Consumer` implementations — `multiTopicConsumer.Close` 
(consumer_multitopic.go:333) and `regexConsumer.Close` (consumer_regex.go:316) 
— do not call `Interceptors.OnConsumerClose`, so an application that uses 
`client.Subscribe` with multiple topics or `TopicsPattern` will never receive 
the close callback documented here. Either invoke 
`Interceptors.OnConsumerClose` from these Close paths as well, or document the 
limitation explicitly in `ConsumerCloseInterceptor` and on 
`MaxReconnectToBroker`.
   



##########
pulsar/consumer_partition.go:
##########
@@ -2088,19 +2151,20 @@ func (pc *partitionConsumer) 
reconnectToBroker(connectionClosed *connectionClose
                        return struct{}{}, nil
                }
                pc.log.WithError(err).Error("Failed to create consumer at 
reconnect")
-               errMsg := err.Error()
-               if strings.Contains(errMsg, errMsgTopicNotFound) {
-                       // when topic is deleted, we should give up 
reconnection.
-                       pc.log.Warn("Topic Not Found.")
+               if isNonRetriableSubscribeError(err) {
+                       pc.log.WithError(err).Warn("Non-retriable error during 
reconnect, giving up")
+                       notifyReconnectGiveUp(err)
                        return struct{}{}, nil
                }
 
                if maxRetry > 0 {
                        maxRetry--
                }
                pc.metrics.ConsumersReconnectFailure.Inc()
-               if maxRetry == 0 || bo.IsMaxBackoffReached() {
+               if maxRetry == 0 {
                        pc.metrics.ConsumersReconnectMaxRetry.Inc()

Review Comment:
   Previously, `ConsumersReconnectMaxRetry` was incremented in two cases: 
`maxRetry == 0` OR `bo.IsMaxBackoffReached()`. After this change the 
`IsMaxBackoffReached()` branch is removed, so any user dashboard/alert that 
counts this metric to detect backoff-exhausted reconnect attempts (e.g., the 
common case where `MaxReconnectToBroker` is unset and the only way to detect 
"we've been retrying forever at the ceiling" was via this metric) will silently 
stop firing. This is a behavior change worth calling out, and ideally the 
metric should still increment whenever the loop is in the give-up condition 
that previously incremented it.
   



##########
pulsar/consumer_test.go:
##########
@@ -5921,3 +5921,139 @@ func TestSelectConnectionForSameConsumer(t *testing.T) {
                        "The consumer uses a different connection when 
reconnecting")
        }
 }
+
+// closeInterceptor captures the (consumer, err) pair delivered to
+// ConsumerCloseInterceptor.OnConsumerClose and signals via fired.
+type closeInterceptor struct {
+       fired    chan struct{}
+       consumer Consumer
+       err      error
+       once     sync.Once
+}
+
+func (c *closeInterceptor) BeforeConsume(_ ConsumerMessage)              {}
+func (c *closeInterceptor) OnAcknowledge(_ Consumer, _ MessageID)        {}
+func (c *closeInterceptor) OnNegativeAcksSend(_ Consumer, _ []MessageID) {}
+func (c *closeInterceptor) OnConsumerClose(consumer Consumer, err error) {
+       c.once.Do(func() {
+               c.consumer = consumer
+               c.err = err
+               close(c.fired)
+       })
+}
+
+func TestConsumerOnCloseInterceptorOnMaxReconnect(t *testing.T) {
+       req := testcontainers.ContainerRequest{
+               Image:        getPulsarTestImage(),
+               ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+               WaitingFor:   wait.ForExposedPort(),
+               Cmd:          []string{"bin/pulsar", "standalone", "-nfw", 
"--advertised-address", "localhost"},
+       }
+       c, err := testcontainers.GenericContainer(context.Background(), 
testcontainers.GenericContainerRequest{
+               ContainerRequest: req,
+               Started:          true,
+       })
+       require.NoError(t, err)
+       t.Cleanup(func() {
+               if err := c.Terminate(context.Background()); err != nil {
+                       t.Logf("container terminate (cleanup) returned: %v", 
err)
+               }
+       })
+       endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
+       require.NoError(t, err)
+
+       pulsarClient, err := NewClient(ClientOptions{
+               URL:               endpoint,
+               ConnectionTimeout: 3 * time.Second,
+               OperationTimeout:  5 * time.Second,
+       })
+       require.NoError(t, err)
+       defer pulsarClient.Close()
+
+       maxRetry := uint(1)
+       interceptor := &closeInterceptor{fired: make(chan struct{})}
+
+       topic := newTopicName()
+       var testConsumer Consumer
+       require.Eventually(t, func() bool {
+               testConsumer, err = pulsarClient.Subscribe(ConsumerOptions{
+                       Topic:                topic,
+                       SubscriptionName:     "test-on-close-interceptor",
+                       MaxReconnectToBroker: &maxRetry,
+                       BackOffPolicyFunc: func() backoff.Policy {
+                               return 
newTestBackoffPolicy(100*time.Millisecond, 1*time.Second)
+                       },
+                       Interceptors: ConsumerInterceptors{interceptor},
+               })
+               return err == nil
+       }, 30*time.Second, 1*time.Second)
+       defer testConsumer.Close()
+
+       require.NoError(t, c.Terminate(context.Background()))
+
+       select {
+       case <-interceptor.fired:
+       case <-time.After(30 * time.Second):
+               t.Fatal("OnConsumerClose was not called within timeout")
+       }
+
+       assert.NotNil(t, interceptor.err, "interceptor should receive the cause 
of the close")
+       assert.Equal(t, testConsumer, interceptor.consumer, "interceptor should 
receive the parent consumer")
+
+       pc := testConsumer.(*consumer).consumers[0]
+       require.Eventually(t, func() bool {
+               return pc.getConsumerState() == consumerClosed
+       }, 30*time.Second, 100*time.Millisecond, "consumer should be closed 
after exhausting max reconnect retries")
+}

Review Comment:
   `TestConsumerOnCloseInterceptorOnMaxReconnect` starts a real 
`testcontainers` Pulsar standalone and shuts it down mid-test to drive the 
reconnect path. This makes the test slow (multiple 30s timeouts) and dependent 
on Docker being available, while the same end-to-end behavior (listener fires 
once and consumer transitions to `consumerClosed` on max-retry exhaustion) can 
be exercised with a unit-level test that constructs `partitionConsumerOpts` 
directly — as the PR description itself acknowledges ("Unit-level verification 
can be done by constructing a `partitionConsumerOpts` directly..."). Consider 
replacing or complementing the container-based test with a faster unit test so 
the new behavior is covered by CI without requiring a live broker.



##########
pulsar/consumer_test.go:
##########
@@ -5921,3 +5921,139 @@ func TestSelectConnectionForSameConsumer(t *testing.T) {
                        "The consumer uses a different connection when 
reconnecting")
        }
 }
+
+// closeInterceptor captures the (consumer, err) pair delivered to
+// ConsumerCloseInterceptor.OnConsumerClose and signals via fired.
+type closeInterceptor struct {
+       fired    chan struct{}
+       consumer Consumer
+       err      error
+       once     sync.Once
+}
+
+func (c *closeInterceptor) BeforeConsume(_ ConsumerMessage)              {}
+func (c *closeInterceptor) OnAcknowledge(_ Consumer, _ MessageID)        {}
+func (c *closeInterceptor) OnNegativeAcksSend(_ Consumer, _ []MessageID) {}
+func (c *closeInterceptor) OnConsumerClose(consumer Consumer, err error) {
+       c.once.Do(func() {
+               c.consumer = consumer
+               c.err = err
+               close(c.fired)
+       })
+}
+
+func TestConsumerOnCloseInterceptorOnMaxReconnect(t *testing.T) {
+       req := testcontainers.ContainerRequest{
+               Image:        getPulsarTestImage(),
+               ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+               WaitingFor:   wait.ForExposedPort(),
+               Cmd:          []string{"bin/pulsar", "standalone", "-nfw", 
"--advertised-address", "localhost"},
+       }
+       c, err := testcontainers.GenericContainer(context.Background(), 
testcontainers.GenericContainerRequest{
+               ContainerRequest: req,
+               Started:          true,
+       })
+       require.NoError(t, err)
+       t.Cleanup(func() {
+               if err := c.Terminate(context.Background()); err != nil {
+                       t.Logf("container terminate (cleanup) returned: %v", 
err)
+               }
+       })
+       endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
+       require.NoError(t, err)
+
+       pulsarClient, err := NewClient(ClientOptions{
+               URL:               endpoint,
+               ConnectionTimeout: 3 * time.Second,
+               OperationTimeout:  5 * time.Second,
+       })
+       require.NoError(t, err)
+       defer pulsarClient.Close()
+
+       maxRetry := uint(1)
+       interceptor := &closeInterceptor{fired: make(chan struct{})}
+
+       topic := newTopicName()
+       var testConsumer Consumer
+       require.Eventually(t, func() bool {
+               testConsumer, err = pulsarClient.Subscribe(ConsumerOptions{
+                       Topic:                topic,
+                       SubscriptionName:     "test-on-close-interceptor",
+                       MaxReconnectToBroker: &maxRetry,
+                       BackOffPolicyFunc: func() backoff.Policy {
+                               return 
newTestBackoffPolicy(100*time.Millisecond, 1*time.Second)
+                       },
+                       Interceptors: ConsumerInterceptors{interceptor},
+               })
+               return err == nil
+       }, 30*time.Second, 1*time.Second)
+       defer testConsumer.Close()
+
+       require.NoError(t, c.Terminate(context.Background()))
+
+       select {
+       case <-interceptor.fired:
+       case <-time.After(30 * time.Second):
+               t.Fatal("OnConsumerClose was not called within timeout")
+       }
+
+       assert.NotNil(t, interceptor.err, "interceptor should receive the cause 
of the close")
+       assert.Equal(t, testConsumer, interceptor.consumer, "interceptor should 
receive the parent consumer")
+
+       pc := testConsumer.(*consumer).consumers[0]
+       require.Eventually(t, func() bool {
+               return pc.getConsumerState() == consumerClosed
+       }, 30*time.Second, 100*time.Millisecond, "consumer should be closed 
after exhausting max reconnect retries")
+}
+
+func TestConsumerOnCloseInterceptorOnUserClose(t *testing.T) {
+       client, err := NewClient(ClientOptions{URL: serviceURL})
+       require.NoError(t, err)
+       defer client.Close()
+
+       interceptor := &closeInterceptor{fired: make(chan struct{})}
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            newTopicName(),
+               SubscriptionName: "test-on-close-user",
+               Interceptors:     ConsumerInterceptors{interceptor},
+       })
+       require.NoError(t, err)
+
+       consumer.Close()
+
+       select {
+       case <-interceptor.fired:
+       case <-time.After(5 * time.Second):
+               t.Fatal("OnConsumerClose was not called within timeout")
+       }
+
+       assert.Nil(t, interceptor.err, "user-initiated close should report nil 
cause")
+       assert.Equal(t, consumer, interceptor.consumer)
+}
+
+func TestIsNonRetriableSubscribeError(t *testing.T) {
+       cases := []struct {
+               name string
+               err  error
+               want bool
+       }{
+               {"nil", nil, false},
+               {"topic not found", errors.New("TopicNotFound: topic does not 
exist"), true},
+               {"topic terminated", errors.New("TopicTerminatedError: topic 
was terminated"), true},
+               {"subscription not found", errors.New("SubscriptionNotFound: 
sub does not exist"), true},
+               {"authorization", errors.New("AuthorizationError: not 
authorized"), true},
+               {"consumer busy", errors.New("ConsumerBusy: another consumer 
attached"), true},
+               {"invalid topic name", errors.New("InvalidTopicName: bad 
name"), true},
+               {"incompatible schema", errors.New("IncompatibleSchema: schema 
mismatch"), true},
+               {"consumer assign error", errors.New("ConsumerAssignError: 
dispatcher assign failed"), true},
+               {"not allowed", errors.New("NotAllowedError: action not 
permitted"), true},
+               {"service not ready (retriable)", errors.New("ServiceNotReady: 
please retry"), false},
+               {"metadata error (retriable)", errors.New("MetadataError: zk 
timeout"), false},
+               {"plain network error (retriable)", errors.New("dial tcp: i/o 
timeout"), false},
+       }
+       for _, tc := range cases {
+               t.Run(tc.name, func(t *testing.T) {
+                       assert.Equal(t, tc.want, 
isNonRetriableSubscribeError(tc.err))
+               })
+       }
+}

Review Comment:
   `TestIsNonRetriableSubscribeError` only exercises happy-path strings 
("AuthorizationError: ...", etc.). It doesn't cover the substring-collision 
edge cases that the implementation is vulnerable to (e.g. a retriable error 
whose free-form message contains the literal text "AuthorizationError" or 
"ConsumerBusy"). Adding such cases would make the test more meaningful and 
would have caught false-positive matches.



##########
pulsar/consumer.go:
##########
@@ -240,6 +240,11 @@ type ConsumerOptions struct {
        Schema Schema
 
        // MaxReconnectToBroker sets the maximum retry number of 
reconnectToBroker. (default: ultimate)
+       // When the retry budget is exhausted, or when the broker reports a 
non-retriable error
+       // (e.g. AuthorizationError, TopicNotFound, TopicTerminated, 
IncompatibleSchema), the
+       // consumer is closed. Applications can observe the close — and recover 
the cause as the
+       // err argument — by registering a ConsumerInterceptor that also 
implements

Review Comment:
   This doc update describes a significant behavior change — the consumer is 
now always closed when `MaxReconnectToBroker` is exhausted or a non-retriable 
error is reported — but the change is buried in a paragraph about the recovery 
callback. Consumers that previously set `MaxReconnectToBroker` to bound retries 
(knowing the consumer would stay alive and stop retrying) will now find their 
consumer auto-closed. Please call out the behavior change explicitly (e.g. 
"Setting this option now also causes the consumer to be closed when the retry 
budget is exhausted") and consider mentioning it in release notes/CHANGELOG.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to