This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new e7a771fe [improve] return `ErrMaxConcurrentOpsReached` when too many concurrent ops in transaction coordinator client (#1242) e7a771fe is described below commit e7a771fe8bab8b2790620848e18feabcde1056e7 Author: Zike Yang <z...@apache.org> AuthorDate: Thu Jul 11 16:08:05 2024 +0800 [improve] return `ErrMaxConcurrentOpsReached` when too many concurrent ops in transaction coordinator client (#1242) ### Motivation Currently, the client will return an UnknownError when there are too many concurrent ops in transaction coordinator client ### Modifications - Add new error `ErrMaxConcurrentOpsReached` - Return `ErrMaxConcurrentOpsReached` when too many concurrent ops in transaction coordinator client --- pulsar/error.go | 4 ++++ pulsar/transaction_coordinator_client.go | 3 ++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/pulsar/error.go b/pulsar/error.go index ccadb724..1a9345b0 100644 --- a/pulsar/error.go +++ b/pulsar/error.go @@ -121,6 +121,10 @@ const ( // fenced. Applications are now supposed to close it and create a // new producer ProducerFenced + // MaxConcurrentOperationsReached indicates that the maximum number of concurrent operations + // has been reached. This means that no additional operations can be started until some + // of the current operations complete. + MaxConcurrentOperationsReached // TransactionCoordinatorNotEnabled indicates that the transaction coordinator is not enabled. // This error is returned when an operation that requires the transaction coordinator is attempted // but the transaction coordinator feature is not enabled in the system or the transaction coordinator diff --git a/pulsar/transaction_coordinator_client.go b/pulsar/transaction_coordinator_client.go index d402d7ec..c4b7a6a2 100644 --- a/pulsar/transaction_coordinator_client.go +++ b/pulsar/transaction_coordinator_client.go @@ -41,6 +41,7 @@ type transactionCoordinatorClient struct { // where the TC located. const TransactionCoordinatorAssign = "persistent://pulsar/system/transaction_coordinator_assign" +var ErrMaxConcurrentOpsReached = newError(MaxConcurrentOperationsReached, "Max concurrent operations reached") var ErrTransactionCoordinatorNotEnabled = newError(TransactionCoordinatorNotEnabled, "The broker doesn't enable "+ "the transaction coordinator, or the transaction coordinator has not initialized") @@ -212,7 +213,7 @@ func getTCAssignTopicName(partition uint64) string { func (tc *transactionCoordinatorClient) canSendRequest() error { if !tc.semaphore.Acquire(context.Background()) { - return newError(UnknownError, "Failed to acquire semaphore") + return ErrMaxConcurrentOpsReached } return nil }