This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e71a47a [fix] [client] fix same producer/consumer use more than one 
connection per broker (#1323)
4e71a47a is described below

commit 4e71a47a4f6174f883905e740ba35f2498494ed1
Author: Baodi Shi <[email protected]>
AuthorDate: Fri Jan 3 09:55:49 2025 +0800

    [fix] [client] fix same producer/consumer use more than one connection per 
broker (#1323)
    
    * [fix] [client] fix same producer/consumer use more than one connection 
per broker
    
    * Fix lint
    
    * Apply suggestions from code review
    
    Co-authored-by: Zike Yang <[email protected]>
    
    * Address comment
    
    * Addressd comment
    
    * make lint
    
    ---------
    
    Co-authored-by: Zike Yang <[email protected]>
---
 pulsar/consumer_partition.go             |  8 +++++---
 pulsar/consumer_test.go                  | 28 +++++++++++++++++++++++++++
 pulsar/internal/connection_pool.go       | 33 ++++++++++++++++----------------
 pulsar/internal/lookup_service_test.go   | 12 ++++++++++++
 pulsar/internal/rpc_client.go            | 11 ++++++++++-
 pulsar/producer_partition.go             |  6 ++++--
 pulsar/producer_test.go                  | 31 ++++++++++++++++++++++++++++++
 pulsar/transaction_coordinator_client.go |  6 ++++--
 8 files changed, 111 insertions(+), 24 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 520d9e8d..24ffa401 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -141,7 +141,8 @@ type partitionConsumer struct {
        state          uAtomic.Int32
        options        *partitionConsumerOpts
 
-       conn atomic.Pointer[internal.Connection]
+       conn         atomic.Pointer[internal.Connection]
+       cnxKeySuffix int32
 
        topic        string
        name         string
@@ -351,6 +352,7 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
                parentConsumer:             parent,
                client:                     client,
                options:                    options,
+               cnxKeySuffix:               
client.cnxPool.GenerateRoundRobinIndex(),
                topic:                      options.topic,
                name:                       options.consumerName,
                consumerID:                 client.rpcClient.NewConsumerID(),
@@ -1964,7 +1966,7 @@ func (pc *partitionConsumer) grabConn(assignedBrokerURL 
string) error {
                cmdSubscribe.ForceTopicCreation = proto.Bool(false)
        }
 
-       res, err := pc.client.rpcClient.Request(lr.LogicalAddr, 
lr.PhysicalAddr, requestID,
+       res, err := pc.client.rpcClient.RequestWithCnxKeySuffix(lr.LogicalAddr, 
lr.PhysicalAddr, pc.cnxKeySuffix, requestID,
                pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
 
        if err != nil {
@@ -1975,7 +1977,7 @@ func (pc *partitionConsumer) grabConn(assignedBrokerURL 
string) error {
                                ConsumerId: proto.Uint64(pc.consumerID),
                                RequestId:  proto.Uint64(requestID),
                        }
-                       _, _ = pc.client.rpcClient.Request(lr.LogicalAddr, 
lr.PhysicalAddr, requestID,
+                       _, _ = 
pc.client.rpcClient.RequestWithCnxKeySuffix(lr.LogicalAddr, lr.PhysicalAddr, 
pc.cnxKeySuffix, requestID,
                                pb.BaseCommand_CLOSE_CONSUMER, cmdClose)
                }
                return err
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 3445f425..33e4d057 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -5038,3 +5038,31 @@ func TestClientVersion(t *testing.T) {
        assert.True(t, strings.HasSuffix(publisher.ClientVersion, 
"-test-client"))
 
 }
+
+func TestSelectConnectionForSameConsumer(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL:                     serviceURL,
+               MaxConnectionsPerBroker: 10,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+
+       topicName := newTopicName()
+
+       _consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topicName,
+               SubscriptionName: "sub-1",
+               Type:             Shared,
+       })
+       assert.NoError(t, err)
+       defer _consumer.Close()
+
+       partitionConsumerImpl := _consumer.(*consumer).consumers[0]
+       conn := partitionConsumerImpl._getConn()
+
+       for i := 0; i < 5; i++ {
+               assert.NoError(t, partitionConsumerImpl.grabConn(""))
+               assert.Equal(t, conn.ID(), 
partitionConsumerImpl._getConn().ID(),
+                       "The consumer uses a different connection when 
reconnecting")
+       }
+}
diff --git a/pulsar/internal/connection_pool.go 
b/pulsar/internal/connection_pool.go
index ee583825..cd082188 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -32,11 +32,14 @@ import (
 // ConnectionPool is a interface of connection pool.
 type ConnectionPool interface {
        // GetConnection get a connection from ConnectionPool.
-       GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, 
error)
+       GetConnection(logicalAddr *url.URL, physicalAddr *url.URL, keySuffix 
int32) (Connection, error)
 
        // GetConnections get all connections in the pool.
        GetConnections() map[string]Connection
 
+       // GenerateRoundRobinIndex generates a round-robin index.
+       GenerateRoundRobinIndex() int32
+
        // Close all the connections in the pool
        Close()
 }
@@ -47,8 +50,8 @@ type connectionPool struct {
        connectionTimeout     time.Duration
        tlsOptions            *TLSOptions
        auth                  auth.Provider
-       maxConnectionsPerHost int32
-       roundRobinCnt         int32
+       maxConnectionsPerHost uint32
+       roundRobinCnt         uint32
        keepAliveInterval     time.Duration
        closeCh               chan struct{}
 
@@ -73,7 +76,7 @@ func NewConnectionPool(
                tlsOptions:            tlsOptions,
                auth:                  auth,
                connectionTimeout:     connectionTimeout,
-               maxConnectionsPerHost: int32(maxConnectionsPerHost),
+               maxConnectionsPerHost: uint32(maxConnectionsPerHost),
                keepAliveInterval:     keepAliveInterval,
                log:                   logger,
                metrics:               metrics,
@@ -84,9 +87,12 @@ func NewConnectionPool(
        return p
 }
 
-func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr 
*url.URL) (Connection, error) {
-       p.log.WithField("logicalAddr", logicalAddr).WithField("physicalAddr", 
physicalAddr).Debug("Getting pooled connection")
-       key := p.getMapKey(logicalAddr, physicalAddr)
+func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr 
*url.URL,
+       keySuffix int32) (Connection, error) {
+       p.log.WithField("logicalAddr", logicalAddr).
+               WithField("physicalAddr", physicalAddr).
+               WithField("keySuffix", keySuffix).Debug("Getting pooled 
connection")
+       key := fmt.Sprint(logicalAddr.Host, "-", physicalAddr.Host, "-", 
keySuffix)
 
        p.Lock()
        conn, ok := p.connections[key]
@@ -141,6 +147,10 @@ func (p *connectionPool) GetConnections() 
map[string]Connection {
        return conns
 }
 
+func (p *connectionPool) GenerateRoundRobinIndex() int32 {
+       return int32(atomic.AddUint32(&p.roundRobinCnt, 1) % 
p.maxConnectionsPerHost)
+}
+
 func (p *connectionPool) Close() {
        p.Lock()
        close(p.closeCh)
@@ -151,15 +161,6 @@ func (p *connectionPool) Close() {
        p.Unlock()
 }
 
-func (p *connectionPool) getMapKey(logicalAddr *url.URL, physicalAddr 
*url.URL) string {
-       cnt := atomic.AddInt32(&p.roundRobinCnt, 1)
-       if cnt < 0 {
-               cnt = -cnt
-       }
-       idx := cnt % p.maxConnectionsPerHost
-       return fmt.Sprint(logicalAddr.Host, "-", physicalAddr.Host, "-", idx)
-}
-
 func (p *connectionPool) checkAndCleanIdleConnections(maxIdleTime 
time.Duration) {
        if maxIdleTime < 0 {
                return
diff --git a/pulsar/internal/lookup_service_test.go 
b/pulsar/internal/lookup_service_test.go
index 733586c3..4692f186 100644
--- a/pulsar/internal/lookup_service_test.go
+++ b/pulsar/internal/lookup_service_test.go
@@ -108,6 +108,12 @@ func (c *mockedLookupRPCClient) Request(logicalAddr 
*url.URL, physicalAddr *url.
        }, nil
 }
 
+func (c *mockedLookupRPCClient) RequestWithCnxKeySuffix(_ *url.URL, _ *url.URL,
+       _ int32, _ uint64, _ pb.BaseCommand_Type, _ proto.Message) (*RPCResult, 
error) {
+       assert.Fail(c.t, "Shouldn't be called")
+       return nil, nil
+}
+
 func (c *mockedLookupRPCClient) RequestOnCnx(_ Connection, _ uint64, _ 
pb.BaseCommand_Type,
        _ proto.Message) (*RPCResult, error) {
        assert.Fail(c.t, "Shouldn't be called")
@@ -492,6 +498,12 @@ func (m mockedPartitionedTopicMetadataRPCClient) Request(_ 
*url.URL, _ *url.URL,
        return nil, nil
 }
 
+func (m *mockedPartitionedTopicMetadataRPCClient) RequestWithCnxKeySuffix(_ 
*url.URL, _ *url.URL,
+       _ int32, _ uint64, _ pb.BaseCommand_Type, _ proto.Message) (*RPCResult, 
error) {
+       assert.Fail(m.t, "Shouldn't be called")
+       return nil, nil
+}
+
 func (m mockedPartitionedTopicMetadataRPCClient) RequestOnCnxNoWait(_ 
Connection, _ pb.BaseCommand_Type,
        _ proto.Message) error {
        assert.Fail(m.t, "Shouldn't be called")
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 9a88a167..342fdedd 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -64,6 +64,9 @@ type RPCClient interface {
        RequestToHost(serviceNameResolver *ServiceNameResolver, requestID 
uint64,
                cmdType pb.BaseCommand_Type, message proto.Message) 
(*RPCResult, error)
 
+       RequestWithCnxKeySuffix(logicalAddr *url.URL, physicalAddr *url.URL, 
cnxKeySuffix int32, requestID uint64,
+               cmdType pb.BaseCommand_Type, message proto.Message) 
(*RPCResult, error)
+
        Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
                cmdType pb.BaseCommand_Type, message proto.Message) 
(*RPCResult, error)
 
@@ -154,7 +157,13 @@ func (c *rpcClient) RequestToHost(serviceNameResolver 
*ServiceNameResolver, requ
 
 func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, 
requestID uint64,
        cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) 
{
-       cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
+       return c.RequestWithCnxKeySuffix(logicalAddr, physicalAddr, 
c.pool.GenerateRoundRobinIndex(),
+               requestID, cmdType, message)
+}
+
+func (c *rpcClient) RequestWithCnxKeySuffix(logicalAddr *url.URL, physicalAddr 
*url.URL, cnxKeySuffix int32,
+       requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) 
(*RPCResult, error) {
+       cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr, 
cnxKeySuffix)
        if err != nil {
                return nil, err
        }
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index d83cf064..4b950dff 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -96,7 +96,8 @@ type partitionProducer struct {
        topic  string
        log    log.Logger
 
-       conn uAtomic.Value
+       conn         uAtomic.Value
+       cnxKeySuffix int32
 
        options                  *ProducerOptions
        producerName             string
@@ -179,6 +180,7 @@ func newPartitionProducer(client *client, topic string, 
options *ProducerOptions
                client:           client,
                topic:            topic,
                log:              logger,
+               cnxKeySuffix:     client.cnxPool.GenerateRoundRobinIndex(),
                options:          options,
                producerID:       client.rpcClient.NewProducerID(),
                dataChan:         make(chan *sendRequest, maxPendingMessages),
@@ -301,7 +303,7 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL 
string) error {
                cmdProducer.Metadata = toKeyValues(p.options.Properties)
        }
 
-       cnx, err := p.client.cnxPool.GetConnection(lr.LogicalAddr, 
lr.PhysicalAddr)
+       cnx, err := p.client.cnxPool.GetConnection(lr.LogicalAddr, 
lr.PhysicalAddr, p.cnxKeySuffix)
        // registering the producer first in case broker sends commands in the 
middle
        if err != nil {
                p.log.Error("Failed to get connection")
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 24939a82..3af4a89c 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -2574,3 +2574,34 @@ func TestProducerKeepReconnectingAndThenCallClose(t 
*testing.T) {
                return true
        }, 30*time.Second, 1*time.Second)
 }
+
+func TestSelectConnectionForSameProducer(t *testing.T) {
+       topicName := newTopicName()
+
+       client, err := NewClient(ClientOptions{
+               URL:                     serviceURL,
+               MaxConnectionsPerBroker: 10,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+
+       reconnectNum := uint(1)
+       _producer, err := client.CreateProducer(ProducerOptions{
+               Topic:                topicName,
+               MaxReconnectToBroker: &reconnectNum,
+       })
+       assert.NoError(t, err)
+       defer _producer.Close()
+
+       partitionProducerImp := 
_producer.(*producer).producers[0].(*partitionProducer)
+       conn := partitionProducerImp._getConn()
+
+       for i := 0; i < 5; i++ {
+               partitionProducerImp.grabCnx("")
+               currentConn := partitionProducerImp._getConn()
+               assert.Equal(t, conn.ID(), currentConn.ID(),
+                       "The producer uses a different connection when 
reconnecting")
+       }
+
+       client.Close()
+}
diff --git a/pulsar/transaction_coordinator_client.go 
b/pulsar/transaction_coordinator_client.go
index e66b06ef..9382da6e 100644
--- a/pulsar/transaction_coordinator_client.go
+++ b/pulsar/transaction_coordinator_client.go
@@ -46,6 +46,7 @@ type transactionHandler struct {
        tc              *transactionCoordinatorClient
        state           uAtomic.Int32
        conn            uAtomic.Value
+       cnxKeySuffix    int32
        partition       uint64
        closeCh         chan any
        requestCh       chan any
@@ -67,6 +68,7 @@ func (t *transactionHandler) getState() txnHandlerState {
 func (tc *transactionCoordinatorClient) newTransactionHandler(partition 
uint64) (*transactionHandler, error) {
        handler := &transactionHandler{
                tc:              tc,
+               cnxKeySuffix:    tc.client.cnxPool.GenerateRoundRobinIndex(),
                partition:       partition,
                closeCh:         make(chan any),
                requestCh:       make(chan any),
@@ -95,8 +97,8 @@ func (t *transactionHandler) grabConn() error {
                TcId:      proto.Uint64(t.partition),
        }
 
-       res, err := t.tc.client.rpcClient.Request(lr.LogicalAddr, 
lr.PhysicalAddr, requestID,
-               pb.BaseCommand_TC_CLIENT_CONNECT_REQUEST, &cmdTCConnect)
+       res, err := 
t.tc.client.rpcClient.RequestWithCnxKeySuffix(lr.LogicalAddr, lr.PhysicalAddr, 
t.cnxKeySuffix,
+               requestID, pb.BaseCommand_TC_CLIENT_CONNECT_REQUEST, 
&cmdTCConnect)
 
        if err != nil {
                t.log.WithError(err).Error("Failed to connect transaction_impl 
coordinator " +

Reply via email to