This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch branch-0.13.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/branch-0.13.0 by this push:
     new 81eca356 fix: fix producer connection (#1243)
81eca356 is described below

commit 81eca356ec1486199e95ea8f4def005cc6b6e83d
Author: Zixuan Liu <node...@gmail.com>
AuthorDate: Fri Jul 12 19:56:22 2024 +0800

    fix: fix producer connection (#1243)
    
    * fix: fix producer connection
    
    * Fix test
    
    * Fix nil pointer
    
    * Fix GetConnection err
    
    * Fix cnx
    
    (cherry picked from commit 29f2779123c6979c95b6235f16e6fbde33ca681f)
---
 pulsar/internal/rpc_client.go | 32 ++++++++------------------------
 pulsar/producer_partition.go  | 20 ++++++++++++--------
 pulsar/producer_test.go       | 32 ++++++++++++++++++++++++++++++++
 3 files changed, 52 insertions(+), 32 deletions(-)

diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index fc923196..d2e3895e 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -150,12 +150,18 @@ func (c *rpcClient) RequestToHost(serviceNameResolver 
*ServiceNameResolver, requ
 
 func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, 
requestID uint64,
        cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) 
{
-       c.metrics.RPCRequestCount.Inc()
        cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
        if err != nil {
                return nil, err
        }
 
+       return c.RequestOnCnx(cnx, requestID, cmdType, message)
+}
+
+func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType 
pb.BaseCommand_Type,
+       message proto.Message) (*RPCResult, error) {
+       c.metrics.RPCRequestCount.Inc()
+
        ch := make(chan result, 1)
 
        cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response 
*pb.BaseCommand, err error) {
@@ -171,7 +177,7 @@ func (c *rpcClient) Request(logicalAddr *url.URL, 
physicalAddr *url.URL, request
                case res := <-ch:
                        // Ignoring producer not ready response.
                        // Continue to wait for the producer to create 
successfully
-                       if res.error == nil && *res.RPCResult.Response.Type == 
pb.BaseCommand_PRODUCER_SUCCESS {
+                       if res.error == nil && res.Response != nil && 
*res.RPCResult.Response.Type == pb.BaseCommand_PRODUCER_SUCCESS {
                                if 
!res.RPCResult.Response.ProducerSuccess.GetProducerReady() {
                                        timeoutCh = nil
                                        break
@@ -184,28 +190,6 @@ func (c *rpcClient) Request(logicalAddr *url.URL, 
physicalAddr *url.URL, request
        }
 }
 
-func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType 
pb.BaseCommand_Type,
-       message proto.Message) (*RPCResult, error) {
-       c.metrics.RPCRequestCount.Inc()
-
-       ch := make(chan result, 1)
-
-       cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response 
*pb.BaseCommand, err error) {
-               ch <- result{&RPCResult{
-                       Cnx:      cnx,
-                       Response: response,
-               }, err}
-               close(ch)
-       })
-
-       select {
-       case res := <-ch:
-               return res.RPCResult, res.error
-       case <-time.After(c.requestTimeout):
-               return nil, ErrRequestTimeOut
-       }
-}
-
 func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, cmdType 
pb.BaseCommand_Type, message proto.Message) error {
        c.metrics.RPCRequestCount.Inc()
        return cnx.SendRequestNoWait(baseCommand(cmdType, message))
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index f3749bc6..e0981303 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -284,20 +284,24 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL 
string) error {
 
        cnx, err := p.client.cnxPool.GetConnection(lr.LogicalAddr, 
lr.PhysicalAddr)
        // registering the producer first in case broker sends commands in the 
middle
-       if err == nil {
-               p._setConn(cnx)
-               err = p._getConn().RegisterListener(p.producerID, p)
-               if err != nil {
-                       p.log.WithError(err).Errorf("Failed to register 
listener: {%d}", p.producerID)
-               }
+       if err != nil {
+               p.log.Error("Failed to get connection")
+               return err
+       }
+
+       p._setConn(cnx)
+       err = p._getConn().RegisterListener(p.producerID, p)
+       if err != nil {
+               p.log.WithError(err).Errorf("Failed to register listener: 
{%d}", p.producerID)
        }
 
-       res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, 
id, pb.BaseCommand_PRODUCER, cmdProducer)
+       res, err := p.client.rpcClient.RequestOnCnx(cnx, id, 
pb.BaseCommand_PRODUCER, cmdProducer)
        if err != nil {
+               p._getConn().UnregisterListener(p.producerID)
                p.log.WithError(err).Error("Failed to create producer at send 
PRODUCER request")
                if errors.Is(err, internal.ErrRequestTimeOut) {
                        id := p.client.rpcClient.NewRequestID()
-                       _, _ = p.client.rpcClient.Request(lr.LogicalAddr, 
lr.PhysicalAddr, id, pb.BaseCommand_CLOSE_PRODUCER,
+                       _, _ = p.client.rpcClient.RequestOnCnx(cnx, id, 
pb.BaseCommand_CLOSE_PRODUCER,
                                &pb.CommandCloseProducer{
                                        ProducerId: &p.producerID,
                                        RequestId:  &id,
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 7c4ff897..a24ee457 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -29,6 +29,8 @@ import (
        "testing"
        "time"
 
+       "github.com/stretchr/testify/require"
+
        "github.com/stretchr/testify/assert"
        "google.golang.org/protobuf/proto"
 
@@ -2474,3 +2476,33 @@ func TestDisableReplication(t *testing.T) {
        assert.NoError(t, err)
        assert.Equal(t, []string{"__local__"}, msgMetadata.GetReplicateTo())
 }
+
+func TestProducerWithMaxConnectionsPerBroker(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL:                     serviceURL,
+               MaxConnectionsPerBroker: 8,
+       })
+       require.NoError(t, err)
+       defer client.Close()
+
+       for i := 0; i < 10; i++ {
+               testProducer, err := client.CreateProducer(ProducerOptions{
+                       Topic:  newTopicName(),
+                       Schema: NewBytesSchema(nil),
+               })
+               require.NoError(t, err)
+               require.NotNil(t, testProducer)
+
+               var ok int32
+               testProducer.SendAsync(context.Background(), 
&ProducerMessage{Value: []byte("hello")},
+                       func(id MessageID, producerMessage *ProducerMessage, 
err error) {
+                               if err == nil {
+                                       atomic.StoreInt32(&ok, 1)
+                               }
+                       })
+               require.Eventually(t, func() bool {
+                       return atomic.LoadInt32(&ok) == 1
+               }, 3*time.Second, time.Millisecond*100)
+               testProducer.Close()
+       }
+}

Reply via email to