This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 4e71a47a [fix] [client] fix same producer/consumer use more than one
connection per broker (#1323)
4e71a47a is described below
commit 4e71a47a4f6174f883905e740ba35f2498494ed1
Author: Baodi Shi <[email protected]>
AuthorDate: Fri Jan 3 09:55:49 2025 +0800
[fix] [client] fix same producer/consumer use more than one connection per
broker (#1323)
* [fix] [client] fix same producer/consumer use more than one connection
per broker
* Fix lint
* Apply suggestions from code review
Co-authored-by: Zike Yang <[email protected]>
* Address comment
* Addressd comment
* make lint
---------
Co-authored-by: Zike Yang <[email protected]>
---
pulsar/consumer_partition.go | 8 +++++---
pulsar/consumer_test.go | 28 +++++++++++++++++++++++++++
pulsar/internal/connection_pool.go | 33 ++++++++++++++++----------------
pulsar/internal/lookup_service_test.go | 12 ++++++++++++
pulsar/internal/rpc_client.go | 11 ++++++++++-
pulsar/producer_partition.go | 6 ++++--
pulsar/producer_test.go | 31 ++++++++++++++++++++++++++++++
pulsar/transaction_coordinator_client.go | 6 ++++--
8 files changed, 111 insertions(+), 24 deletions(-)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 520d9e8d..24ffa401 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -141,7 +141,8 @@ type partitionConsumer struct {
state uAtomic.Int32
options *partitionConsumerOpts
- conn atomic.Pointer[internal.Connection]
+ conn atomic.Pointer[internal.Connection]
+ cnxKeySuffix int32
topic string
name string
@@ -351,6 +352,7 @@ func newPartitionConsumer(parent Consumer, client *client,
options *partitionCon
parentConsumer: parent,
client: client,
options: options,
+ cnxKeySuffix:
client.cnxPool.GenerateRoundRobinIndex(),
topic: options.topic,
name: options.consumerName,
consumerID: client.rpcClient.NewConsumerID(),
@@ -1964,7 +1966,7 @@ func (pc *partitionConsumer) grabConn(assignedBrokerURL
string) error {
cmdSubscribe.ForceTopicCreation = proto.Bool(false)
}
- res, err := pc.client.rpcClient.Request(lr.LogicalAddr,
lr.PhysicalAddr, requestID,
+ res, err := pc.client.rpcClient.RequestWithCnxKeySuffix(lr.LogicalAddr,
lr.PhysicalAddr, pc.cnxKeySuffix, requestID,
pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
if err != nil {
@@ -1975,7 +1977,7 @@ func (pc *partitionConsumer) grabConn(assignedBrokerURL
string) error {
ConsumerId: proto.Uint64(pc.consumerID),
RequestId: proto.Uint64(requestID),
}
- _, _ = pc.client.rpcClient.Request(lr.LogicalAddr,
lr.PhysicalAddr, requestID,
+ _, _ =
pc.client.rpcClient.RequestWithCnxKeySuffix(lr.LogicalAddr, lr.PhysicalAddr,
pc.cnxKeySuffix, requestID,
pb.BaseCommand_CLOSE_CONSUMER, cmdClose)
}
return err
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 3445f425..33e4d057 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -5038,3 +5038,31 @@ func TestClientVersion(t *testing.T) {
assert.True(t, strings.HasSuffix(publisher.ClientVersion,
"-test-client"))
}
+
+func TestSelectConnectionForSameConsumer(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ MaxConnectionsPerBroker: 10,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ topicName := newTopicName()
+
+ _consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "sub-1",
+ Type: Shared,
+ })
+ assert.NoError(t, err)
+ defer _consumer.Close()
+
+ partitionConsumerImpl := _consumer.(*consumer).consumers[0]
+ conn := partitionConsumerImpl._getConn()
+
+ for i := 0; i < 5; i++ {
+ assert.NoError(t, partitionConsumerImpl.grabConn(""))
+ assert.Equal(t, conn.ID(),
partitionConsumerImpl._getConn().ID(),
+ "The consumer uses a different connection when
reconnecting")
+ }
+}
diff --git a/pulsar/internal/connection_pool.go
b/pulsar/internal/connection_pool.go
index ee583825..cd082188 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -32,11 +32,14 @@ import (
// ConnectionPool is a interface of connection pool.
type ConnectionPool interface {
// GetConnection get a connection from ConnectionPool.
- GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection,
error)
+ GetConnection(logicalAddr *url.URL, physicalAddr *url.URL, keySuffix
int32) (Connection, error)
// GetConnections get all connections in the pool.
GetConnections() map[string]Connection
+ // GenerateRoundRobinIndex generates a round-robin index.
+ GenerateRoundRobinIndex() int32
+
// Close all the connections in the pool
Close()
}
@@ -47,8 +50,8 @@ type connectionPool struct {
connectionTimeout time.Duration
tlsOptions *TLSOptions
auth auth.Provider
- maxConnectionsPerHost int32
- roundRobinCnt int32
+ maxConnectionsPerHost uint32
+ roundRobinCnt uint32
keepAliveInterval time.Duration
closeCh chan struct{}
@@ -73,7 +76,7 @@ func NewConnectionPool(
tlsOptions: tlsOptions,
auth: auth,
connectionTimeout: connectionTimeout,
- maxConnectionsPerHost: int32(maxConnectionsPerHost),
+ maxConnectionsPerHost: uint32(maxConnectionsPerHost),
keepAliveInterval: keepAliveInterval,
log: logger,
metrics: metrics,
@@ -84,9 +87,12 @@ func NewConnectionPool(
return p
}
-func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr
*url.URL) (Connection, error) {
- p.log.WithField("logicalAddr", logicalAddr).WithField("physicalAddr",
physicalAddr).Debug("Getting pooled connection")
- key := p.getMapKey(logicalAddr, physicalAddr)
+func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr
*url.URL,
+ keySuffix int32) (Connection, error) {
+ p.log.WithField("logicalAddr", logicalAddr).
+ WithField("physicalAddr", physicalAddr).
+ WithField("keySuffix", keySuffix).Debug("Getting pooled
connection")
+ key := fmt.Sprint(logicalAddr.Host, "-", physicalAddr.Host, "-",
keySuffix)
p.Lock()
conn, ok := p.connections[key]
@@ -141,6 +147,10 @@ func (p *connectionPool) GetConnections()
map[string]Connection {
return conns
}
+func (p *connectionPool) GenerateRoundRobinIndex() int32 {
+ return int32(atomic.AddUint32(&p.roundRobinCnt, 1) %
p.maxConnectionsPerHost)
+}
+
func (p *connectionPool) Close() {
p.Lock()
close(p.closeCh)
@@ -151,15 +161,6 @@ func (p *connectionPool) Close() {
p.Unlock()
}
-func (p *connectionPool) getMapKey(logicalAddr *url.URL, physicalAddr
*url.URL) string {
- cnt := atomic.AddInt32(&p.roundRobinCnt, 1)
- if cnt < 0 {
- cnt = -cnt
- }
- idx := cnt % p.maxConnectionsPerHost
- return fmt.Sprint(logicalAddr.Host, "-", physicalAddr.Host, "-", idx)
-}
-
func (p *connectionPool) checkAndCleanIdleConnections(maxIdleTime
time.Duration) {
if maxIdleTime < 0 {
return
diff --git a/pulsar/internal/lookup_service_test.go
b/pulsar/internal/lookup_service_test.go
index 733586c3..4692f186 100644
--- a/pulsar/internal/lookup_service_test.go
+++ b/pulsar/internal/lookup_service_test.go
@@ -108,6 +108,12 @@ func (c *mockedLookupRPCClient) Request(logicalAddr
*url.URL, physicalAddr *url.
}, nil
}
+func (c *mockedLookupRPCClient) RequestWithCnxKeySuffix(_ *url.URL, _ *url.URL,
+ _ int32, _ uint64, _ pb.BaseCommand_Type, _ proto.Message) (*RPCResult,
error) {
+ assert.Fail(c.t, "Shouldn't be called")
+ return nil, nil
+}
+
func (c *mockedLookupRPCClient) RequestOnCnx(_ Connection, _ uint64, _
pb.BaseCommand_Type,
_ proto.Message) (*RPCResult, error) {
assert.Fail(c.t, "Shouldn't be called")
@@ -492,6 +498,12 @@ func (m mockedPartitionedTopicMetadataRPCClient) Request(_
*url.URL, _ *url.URL,
return nil, nil
}
+func (m *mockedPartitionedTopicMetadataRPCClient) RequestWithCnxKeySuffix(_
*url.URL, _ *url.URL,
+ _ int32, _ uint64, _ pb.BaseCommand_Type, _ proto.Message) (*RPCResult,
error) {
+ assert.Fail(m.t, "Shouldn't be called")
+ return nil, nil
+}
+
func (m mockedPartitionedTopicMetadataRPCClient) RequestOnCnxNoWait(_
Connection, _ pb.BaseCommand_Type,
_ proto.Message) error {
assert.Fail(m.t, "Shouldn't be called")
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 9a88a167..342fdedd 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -64,6 +64,9 @@ type RPCClient interface {
RequestToHost(serviceNameResolver *ServiceNameResolver, requestID
uint64,
cmdType pb.BaseCommand_Type, message proto.Message)
(*RPCResult, error)
+ RequestWithCnxKeySuffix(logicalAddr *url.URL, physicalAddr *url.URL,
cnxKeySuffix int32, requestID uint64,
+ cmdType pb.BaseCommand_Type, message proto.Message)
(*RPCResult, error)
+
Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message)
(*RPCResult, error)
@@ -154,7 +157,13 @@ func (c *rpcClient) RequestToHost(serviceNameResolver
*ServiceNameResolver, requ
func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL,
requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
{
- cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
+ return c.RequestWithCnxKeySuffix(logicalAddr, physicalAddr,
c.pool.GenerateRoundRobinIndex(),
+ requestID, cmdType, message)
+}
+
+func (c *rpcClient) RequestWithCnxKeySuffix(logicalAddr *url.URL, physicalAddr
*url.URL, cnxKeySuffix int32,
+ requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message)
(*RPCResult, error) {
+ cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr,
cnxKeySuffix)
if err != nil {
return nil, err
}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index d83cf064..4b950dff 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -96,7 +96,8 @@ type partitionProducer struct {
topic string
log log.Logger
- conn uAtomic.Value
+ conn uAtomic.Value
+ cnxKeySuffix int32
options *ProducerOptions
producerName string
@@ -179,6 +180,7 @@ func newPartitionProducer(client *client, topic string,
options *ProducerOptions
client: client,
topic: topic,
log: logger,
+ cnxKeySuffix: client.cnxPool.GenerateRoundRobinIndex(),
options: options,
producerID: client.rpcClient.NewProducerID(),
dataChan: make(chan *sendRequest, maxPendingMessages),
@@ -301,7 +303,7 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL
string) error {
cmdProducer.Metadata = toKeyValues(p.options.Properties)
}
- cnx, err := p.client.cnxPool.GetConnection(lr.LogicalAddr,
lr.PhysicalAddr)
+ cnx, err := p.client.cnxPool.GetConnection(lr.LogicalAddr,
lr.PhysicalAddr, p.cnxKeySuffix)
// registering the producer first in case broker sends commands in the
middle
if err != nil {
p.log.Error("Failed to get connection")
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 24939a82..3af4a89c 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -2574,3 +2574,34 @@ func TestProducerKeepReconnectingAndThenCallClose(t
*testing.T) {
return true
}, 30*time.Second, 1*time.Second)
}
+
+func TestSelectConnectionForSameProducer(t *testing.T) {
+ topicName := newTopicName()
+
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ MaxConnectionsPerBroker: 10,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ reconnectNum := uint(1)
+ _producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ MaxReconnectToBroker: &reconnectNum,
+ })
+ assert.NoError(t, err)
+ defer _producer.Close()
+
+ partitionProducerImp :=
_producer.(*producer).producers[0].(*partitionProducer)
+ conn := partitionProducerImp._getConn()
+
+ for i := 0; i < 5; i++ {
+ partitionProducerImp.grabCnx("")
+ currentConn := partitionProducerImp._getConn()
+ assert.Equal(t, conn.ID(), currentConn.ID(),
+ "The producer uses a different connection when
reconnecting")
+ }
+
+ client.Close()
+}
diff --git a/pulsar/transaction_coordinator_client.go
b/pulsar/transaction_coordinator_client.go
index e66b06ef..9382da6e 100644
--- a/pulsar/transaction_coordinator_client.go
+++ b/pulsar/transaction_coordinator_client.go
@@ -46,6 +46,7 @@ type transactionHandler struct {
tc *transactionCoordinatorClient
state uAtomic.Int32
conn uAtomic.Value
+ cnxKeySuffix int32
partition uint64
closeCh chan any
requestCh chan any
@@ -67,6 +68,7 @@ func (t *transactionHandler) getState() txnHandlerState {
func (tc *transactionCoordinatorClient) newTransactionHandler(partition
uint64) (*transactionHandler, error) {
handler := &transactionHandler{
tc: tc,
+ cnxKeySuffix: tc.client.cnxPool.GenerateRoundRobinIndex(),
partition: partition,
closeCh: make(chan any),
requestCh: make(chan any),
@@ -95,8 +97,8 @@ func (t *transactionHandler) grabConn() error {
TcId: proto.Uint64(t.partition),
}
- res, err := t.tc.client.rpcClient.Request(lr.LogicalAddr,
lr.PhysicalAddr, requestID,
- pb.BaseCommand_TC_CLIENT_CONNECT_REQUEST, &cmdTCConnect)
+ res, err :=
t.tc.client.rpcClient.RequestWithCnxKeySuffix(lr.LogicalAddr, lr.PhysicalAddr,
t.cnxKeySuffix,
+ requestID, pb.BaseCommand_TC_CLIENT_CONNECT_REQUEST,
&cmdTCConnect)
if err != nil {
t.log.WithError(err).Error("Failed to connect transaction_impl
coordinator " +