geniusjoe opened a new pull request, #1457:
URL: https://github.com/apache/pulsar-client-go/pull/1457

   Fixes https://github.com/apache/pulsar-client-go/pull/1134
   Master Issue: https://github.com/apache/pulsar-client-go/pull/1128
   
   ### Motivation
   Currently, the Go client handles connections in two ways:
   1. If the current connection is an initialization connection, meaning when 
the producer is initialized for the first time, the `grabCnx()`method will not 
trigger retry logic. If an error occurs, it will be directly returned to the 
caller.
   2. If the current connection is an already active connection, the 
`runEventsLoop()` method will detect the disconnection and call the 
`reconnectToBroker()` method to re-establish the connection. During 
reconnection, the method identifies whether the current exception is a 
retryable error. If it is a retryable error, the `internal.Retry()` inside the 
`reconnectToBroker()` method will retry indefinitely based on the backoff 
strategy. If it is a non-retryable error, such as the current 
`errMsgProducerBlockedQuotaExceededException`, the partition producer will be 
closed, causing any following messages sent to that partition to result in a 
`SendTimeoutError`.
   
   Since the `backlogQuotaExceedException` can be resolved by modifying the TTL 
or increasing consumption speed, it is likely to recover after some retry 
attempts. I believe the exception handling for 
`errMsgProducerBlockedQuotaExceededException` in the `reconnectToBroker()` 
method should align as closely as possible with the Java implementation. Java’s 
current retry logic for connections is as follows:
   1. If an error is received after sending a message or a heartbeat times out, 
the callback function `handleSendError()` is triggered to close the connection.
   2. When the connection is closed, the `connectionClosed` method is called, 
which in turn invokes the `grabCnx()` method, and subsequently `grabCnx()` 
calls the `connectionOpened()` method to reconnect:
   ```Java
   // Schedule a reconnection task
   state.client.timer().newTimeout(timeout -> {
       log.info("[{}] [{}] Reconnecting after timeout", state.topic, 
state.getHandlerName());
       grabCnx();  // Re-establish the connection
   }, delayMs, TimeUnit.MILLISECONDS);
   ```
   3. If the connection still fails in the `connectionOpened()` method, the 
client will execute different logic based on the current exception. For the 
`ProducerBlockedQuotaExceededException` exception, the Java client does not 
treat it as an `isUnrecoverableError`. Instead, it first cleans up the pending 
messages and then attempts to reconnect.
   
   ### Modifications
   To maintain consistency with the Java client, I suggest that when 
`errMsgProducerBlockedQuotaExceededException` is encountered in 
`reconnectToBroker()`, only `failPendingMessages()` should be executed, but the 
connection should not be closed. Specifically, the code should be modified as 
follows:
   ```Go
   if strings.Contains(errMsg, errMsgProducerBlockedQuotaExceededException) {
       p.log.Warn("Producer was blocked by quota exceed exception, failing 
pending messages, stop reconnecting")
       p.failPendingMessages(errors.Join(ErrProducerBlockedQuotaExceeded, err))
       // Do not close the connection here
       // return struct{}{}, err
   }
   ```
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   This change added tests and can be verified as follows:
   `TestProducerReconnectWhenBacklogQuotaExceed`
   
   ### Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (no)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to