geniusjoe opened a new pull request, #1457: URL: https://github.com/apache/pulsar-client-go/pull/1457
Fixes https://github.com/apache/pulsar-client-go/pull/1134 Master Issue: https://github.com/apache/pulsar-client-go/pull/1128 ### Motivation Currently, the Go client handles connections in two ways: 1. If the current connection is an initialization connection, meaning when the producer is initialized for the first time, the `grabCnx()`method will not trigger retry logic. If an error occurs, it will be directly returned to the caller. 2. If the current connection is an already active connection, the `runEventsLoop()` method will detect the disconnection and call the `reconnectToBroker()` method to re-establish the connection. During reconnection, the method identifies whether the current exception is a retryable error. If it is a retryable error, the `internal.Retry()` inside the `reconnectToBroker()` method will retry indefinitely based on the backoff strategy. If it is a non-retryable error, such as the current `errMsgProducerBlockedQuotaExceededException`, the partition producer will be closed, causing any following messages sent to that partition to result in a `SendTimeoutError`. Since the `backlogQuotaExceedException` can be resolved by modifying the TTL or increasing consumption speed, it is likely to recover after some retry attempts. I believe the exception handling for `errMsgProducerBlockedQuotaExceededException` in the `reconnectToBroker()` method should align as closely as possible with the Java implementation. Java’s current retry logic for connections is as follows: 1. If an error is received after sending a message or a heartbeat times out, the callback function `handleSendError()` is triggered to close the connection. 2. When the connection is closed, the `connectionClosed` method is called, which in turn invokes the `grabCnx()` method, and subsequently `grabCnx()` calls the `connectionOpened()` method to reconnect: ```Java // Schedule a reconnection task state.client.timer().newTimeout(timeout -> { log.info("[{}] [{}] Reconnecting after timeout", state.topic, state.getHandlerName()); grabCnx(); // Re-establish the connection }, delayMs, TimeUnit.MILLISECONDS); ``` 3. If the connection still fails in the `connectionOpened()` method, the client will execute different logic based on the current exception. For the `ProducerBlockedQuotaExceededException` exception, the Java client does not treat it as an `isUnrecoverableError`. Instead, it first cleans up the pending messages and then attempts to reconnect. ### Modifications To maintain consistency with the Java client, I suggest that when `errMsgProducerBlockedQuotaExceededException` is encountered in `reconnectToBroker()`, only `failPendingMessages()` should be executed, but the connection should not be closed. Specifically, the code should be modified as follows: ```Go if strings.Contains(errMsg, errMsgProducerBlockedQuotaExceededException) { p.log.Warn("Producer was blocked by quota exceed exception, failing pending messages, stop reconnecting") p.failPendingMessages(errors.Join(ErrProducerBlockedQuotaExceeded, err)) // Do not close the connection here // return struct{}{}, err } ``` ### Verifying this change - [x] Make sure that the change passes the CI checks. This change added tests and can be verified as follows: `TestProducerReconnectWhenBacklogQuotaExceed` ### Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API: (no) - The schema: (no) - The default values of configurations: (no) - The wire protocol: (no) ### Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
