nodece commented on code in PR #1490:
URL: https://github.com/apache/pulsar-client-go/pull/1490#discussion_r3245703190


##########
pulsar/consumer.go:
##########
@@ -222,6 +222,28 @@ type ConsumerOptions struct {
        // MaxReconnectToBroker sets the maximum retry number of 
reconnectToBroker. (default: ultimate)
        MaxReconnectToBroker *uint
 
+       // MaxReconnectToBrokerListener is called when the consumer gives up on 
reconnecting to the
+       // broker. The consumer argument is the parent consumer, and err is the 
last connection error.
+       // Use this callback to detect silent failure and take recovery action 
(e.g. recreate the
+       // consumer). The callback fires at most once per reconnect cycle, in 
either of two cases:
+       //   1. The retry budget set by MaxReconnectToBroker is exhausted.
+       //   2. The broker reports a non-retriable error (e.g. 
AuthorizationError, TopicNotFound,
+       //      TopicTerminated, IncompatibleSchema). In this case the listener 
fires regardless of
+       //      whether MaxReconnectToBroker was set, since retrying cannot 
recover.
+       // This callback is invoked from the partition consumer event loop, so 
applications must not
+       // call consumer.Close() synchronously from within the callback — doing 
so can deadlock. If
+       // closing is required, do it asynchronously (for example, in another 
goroutine), or enable
+       // CloseConsumerOnMaxReconnectToBroker to let the client close the 
consumer safely after the
+       // callback returns.
+       MaxReconnectToBrokerListener func(consumer Consumer, err error)

Review Comment:
   I suggest we add this feature to `ConsumerInterceptor`, so like 
`OnConsumerClose`.
   



##########
pulsar/consumer_partition.go:
##########
@@ -2087,19 +2146,19 @@ func (pc *partitionConsumer) 
reconnectToBroker(connectionClosed *connectionClose
                        return struct{}{}, nil
                }
                pc.log.WithError(err).Error("Failed to create consumer at 
reconnect")
-               errMsg := err.Error()
-               if strings.Contains(errMsg, errMsgTopicNotFound) {
-                       // when topic is deleted, we should give up 
reconnection.
-                       pc.log.Warn("Topic Not Found.")
+               if isNonRetriableSubscribeError(err) {
+                       pc.log.WithError(err).Warn("Non-retriable error during 
reconnect, giving up")
+                       notifyReconnectGiveUp(err)
                        return struct{}{}, nil
                }
 
                if maxRetry > 0 {
                        maxRetry--
                }
                pc.metrics.ConsumersReconnectFailure.Inc()
-               if maxRetry == 0 || bo.IsMaxBackoffReached() {
+               if maxRetry == 0 {
                        pc.metrics.ConsumersReconnectMaxRetry.Inc()
+                       notifyReconnectGiveUp(err)

Review Comment:
   ```suggestion
                        notifyReconnectGiveUp(errors.New("max retry attempts 
reached for reconnecting to broker"))
                        return struct{}{}, nil
   ```



##########
pulsar/consumer.go:
##########
@@ -222,6 +222,28 @@ type ConsumerOptions struct {
        // MaxReconnectToBroker sets the maximum retry number of 
reconnectToBroker. (default: ultimate)
        MaxReconnectToBroker *uint
 
+       // MaxReconnectToBrokerListener is called when the consumer gives up on 
reconnecting to the
+       // broker. The consumer argument is the parent consumer, and err is the 
last connection error.
+       // Use this callback to detect silent failure and take recovery action 
(e.g. recreate the
+       // consumer). The callback fires at most once per reconnect cycle, in 
either of two cases:
+       //   1. The retry budget set by MaxReconnectToBroker is exhausted.
+       //   2. The broker reports a non-retriable error (e.g. 
AuthorizationError, TopicNotFound,
+       //      TopicTerminated, IncompatibleSchema). In this case the listener 
fires regardless of
+       //      whether MaxReconnectToBroker was set, since retrying cannot 
recover.
+       // This callback is invoked from the partition consumer event loop, so 
applications must not
+       // call consumer.Close() synchronously from within the callback — doing 
so can deadlock. If
+       // closing is required, do it asynchronously (for example, in another 
goroutine), or enable
+       // CloseConsumerOnMaxReconnectToBroker to let the client close the 
consumer safely after the
+       // callback returns.
+       MaxReconnectToBrokerListener func(consumer Consumer, err error)
+
+       // CloseConsumerOnMaxReconnectToBroker, when true, automatically closes 
the consumer after
+       // the reconnect loop gives up (either MaxReconnectToBroker exhausted 
or a non-retriable
+       // broker error was received). The close happens asynchronously after
+       // MaxReconnectToBrokerListener (if set) returns, and is the 
recommended option when the
+       // consumer should be closed after reconnect failure. Default: false.
+       CloseConsumerOnMaxReconnectToBroker bool

Review Comment:
   This config is unnecessary, and this value should be the `true` default.



##########
pulsar/consumer_partition.go:
##########
@@ -2061,6 +2106,20 @@ func (pc *partitionConsumer) 
reconnectToBroker(connectionClosed *connectionClose
                assignedBrokerURL = connectionClosed.assignedBrokerURL
        }
 
+       var giveUpNotified bool
+       notifyReconnectGiveUp := func(cause error) {
+               if giveUpNotified {
+                       return
+               }
+               giveUpNotified = true
+               if pc.options.maxReconnectToBrokerListener != nil {
+                       
pc.options.maxReconnectToBrokerListener(pc.parentConsumer, cause)
+               }
+               if pc.options.closeConsumerOnMaxReconnectToBroker {
+                       go pc.parentConsumer.Close()
+               }
+       }

Review Comment:
   ```suggestion
        var giveUpNotified bool
        notifyReconnectGiveUp := func(cause error) {
                if giveUpNotified {
                        return
                }
                giveUpNotified = true
           go pc.parentConsumer.Close()
        }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to