This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 12a6a03  [Issue #145] fix producer cannot connect to broker through 
pulsar proxy (#146)
12a6a03 is described below

commit 12a6a03e9c31fa6892f4219e5850b5d7d298e889
Author: Rui Fu <[email protected]>
AuthorDate: Tue Dec 31 13:01:38 2019 +0800

    [Issue #145] fix producer cannot connect to broker through pulsar proxy 
(#146)
    
    if producer connect to broker through pulsar proxy, producer will get Short 
read when reading frame size error because pulsar proxy not pass commands to 
broker.
---
 pulsar/consumer_partition.go           |  2 +-
 pulsar/internal/connection.go          | 44 ++++++++++++++++++----------------
 pulsar/internal/connection_pool.go     | 12 ++++++----
 pulsar/internal/lookup_service.go      | 12 ++++++----
 pulsar/internal/lookup_service_test.go |  2 +-
 pulsar/internal/rpc_client.go          |  8 +++----
 pulsar/producer_partition.go           |  4 ++--
 7 files changed, 46 insertions(+), 38 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 55d1318..9d47581 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -530,7 +530,7 @@ func (pc *partitionConsumer) grabConn() error {
        }
 
        res, err := pc.client.rpcClient.Request(lr.LogicalAddr, 
lr.PhysicalAddr, requestID,
-               pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
+               pb.BaseCommand_SUBSCRIBE, cmdSubscribe, 
lr.ConnectingThroughProxy)
 
        if err != nil {
                pc.log.WithError(err).Error("Failed to create consumer")
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 7a7cd5f..590b92f 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -119,9 +119,10 @@ type connection struct {
        state             connectionState
        connectionTimeout time.Duration
 
-       logicalAddr  *url.URL
-       physicalAddr *url.URL
-       cnx          net.Conn
+       logicalAddr            *url.URL
+       physicalAddr           *url.URL
+       connectingThroughProxy bool
+       cnx                    net.Conn
 
        writeBufferLock sync.Mutex
        writeBuffer     Buffer
@@ -152,20 +153,21 @@ type connection struct {
 }
 
 func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions 
*TLSOptions,
-       connectionTimeout time.Duration, auth auth.Provider) *connection {
+       connectionTimeout time.Duration, auth auth.Provider, 
connectingThroughProxy bool) *connection {
        cnx := &connection{
-               state:                connectionInit,
-               connectionTimeout:    connectionTimeout,
-               logicalAddr:          logicalAddr,
-               physicalAddr:         physicalAddr,
-               writeBuffer:          NewBuffer(4096),
-               log:                  log.WithField("remote_addr", 
physicalAddr),
-               pendingReqs:          make(map[uint64]*request),
-               lastDataReceivedTime: time.Now(),
-               pingTicker:           time.NewTicker(keepAliveInterval),
-               pingCheckTicker:      time.NewTicker(keepAliveInterval),
-               tlsOptions:           tlsOptions,
-               auth:                 auth,
+               state:                  connectionInit,
+               connectionTimeout:      connectionTimeout,
+               logicalAddr:            logicalAddr,
+               physicalAddr:           physicalAddr,
+               connectingThroughProxy: connectingThroughProxy,
+               writeBuffer:            NewBuffer(4096),
+               log:                    log.WithField("remote_addr", 
physicalAddr),
+               pendingReqs:            make(map[uint64]*request),
+               lastDataReceivedTime:   time.Now(),
+               pingTicker:             time.NewTicker(keepAliveInterval),
+               pingCheckTicker:        time.NewTicker(keepAliveInterval),
+               tlsOptions:             tlsOptions,
+               auth:                   auth,
 
                closeCh:            make(chan interface{}),
                incomingRequestsCh: make(chan *request, 10),
@@ -248,14 +250,16 @@ func (c *connection) doHandshake() bool {
        // During the initial handshake, the internal keep alive is not
        // active yet, so we need to timeout write and read requests
        c.cnx.SetDeadline(time.Now().Add(keepAliveInterval))
-
-       c.writeCommand(baseCommand(pb.BaseCommand_CONNECT, &pb.CommandConnect{
+       cmdConnect := &pb.CommandConnect{
                ProtocolVersion: &version,
                ClientVersion:   proto.String("Pulsar Go 0.1"),
                AuthMethodName:  proto.String(c.auth.Name()),
                AuthData:        authData,
-       }))
-
+       }
+       if c.connectingThroughProxy {
+               cmdConnect.ProxyToBrokerUrl = proto.String(c.logicalAddr.Host)
+       }
+       c.writeCommand(baseCommand(pb.BaseCommand_CONNECT, cmdConnect))
        cmd, _, err := c.reader.readSingleCommand()
        if err != nil {
                c.log.WithError(err).Warn("Failed to perform initial handshake")
diff --git a/pulsar/internal/connection_pool.go 
b/pulsar/internal/connection_pool.go
index 14f7753..efb5cf1 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -18,6 +18,7 @@
 package internal
 
 import (
+       "fmt"
        "net/url"
        "sync"
        "time"
@@ -30,7 +31,7 @@ import (
 // ConnectionPool is a interface of connection pool.
 type ConnectionPool interface {
        // GetConnection get a connection from ConnectionPool.
-       GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, 
error)
+       GetConnection(logicalAddr *url.URL, physicalAddr *url.URL, 
connectingThroughProxy bool) (Connection, error)
 
        // Close all the connections in the pool
        Close()
@@ -52,8 +53,9 @@ func NewConnectionPool(tlsOptions *TLSOptions, auth 
auth.Provider, connectionTim
        }
 }
 
-func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr 
*url.URL) (Connection, error) {
-       cachedCnx, found := p.pool.Load(logicalAddr.Host)
+func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr 
*url.URL,
+       connectingThroughProxy bool) (Connection, error) {
+       cachedCnx, found := p.pool.Load(fmt.Sprintf("%s:%v", logicalAddr.Host, 
connectingThroughProxy))
        if found {
                cnx := cachedCnx.(*connection)
                log.Debug("Found connection in cache:", cnx.logicalAddr, 
cnx.physicalAddr)
@@ -68,8 +70,8 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, 
physicalAddr *url.U
        }
 
        // Try to create a new connection
-       newCnx, wasCached := p.pool.LoadOrStore(logicalAddr.Host,
-               newConnection(logicalAddr, physicalAddr, p.tlsOptions, 
p.connectionTimeout, p.auth))
+       newCnx, wasCached := p.pool.LoadOrStore(fmt.Sprintf("%s:%v", 
logicalAddr.Host, connectingThroughProxy),
+               newConnection(logicalAddr, physicalAddr, p.tlsOptions, 
p.connectionTimeout, p.auth, connectingThroughProxy))
        cnx := newCnx.(*connection)
        if !wasCached {
                cnx.start()
diff --git a/pulsar/internal/lookup_service.go 
b/pulsar/internal/lookup_service.go
index ee6dffe..e056ea5 100644
--- a/pulsar/internal/lookup_service.go
+++ b/pulsar/internal/lookup_service.go
@@ -30,8 +30,9 @@ import (
 
 // LookupResult encapsulates a struct for lookup a request, containing two 
parts: LogicalAddr, PhysicalAddr.
 type LookupResult struct {
-       LogicalAddr  *url.URL
-       PhysicalAddr *url.URL
+       LogicalAddr            *url.URL
+       PhysicalAddr           *url.URL
+       ConnectingThroughProxy bool
 }
 
 // LookupService is a interface of lookup service.
@@ -105,7 +106,7 @@ func (ls *lookupService) Lookup(topic string) 
(*LookupResult, error) {
                                RequestId:     &id,
                                Topic:         &topic,
                                Authoritative: lr.Authoritative,
-                       })
+                       }, false)
                        if err != nil {
                                return nil, err
                        }
@@ -123,8 +124,9 @@ func (ls *lookupService) Lookup(topic string) 
(*LookupResult, error) {
                        }
 
                        return &LookupResult{
-                               LogicalAddr:  logicalAddress,
-                               PhysicalAddr: physicalAddress,
+                               LogicalAddr:            logicalAddress,
+                               PhysicalAddr:           physicalAddress,
+                               ConnectingThroughProxy: 
lr.GetProxyThroughServiceUrl(),
                        }, nil
 
                case pb.CommandLookupTopicResponse_Failed:
diff --git a/pulsar/internal/lookup_service_test.go 
b/pulsar/internal/lookup_service_test.go
index 5bb1724..cce6198 100644
--- a/pulsar/internal/lookup_service_test.go
+++ b/pulsar/internal/lookup_service_test.go
@@ -70,7 +70,7 @@ func (c *mockedRPCClient) RequestToAnyBroker(requestID 
uint64, cmdType pb.BaseCo
 }
 
 func (c *mockedRPCClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, 
requestID uint64,
-       cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) 
{
+       cmdType pb.BaseCommand_Type, message proto.Message, 
connectingThroughProxy bool) (*RPCResult, error) {
        assert.Equal(c.t, cmdType, pb.BaseCommand_LOOKUP)
        expectedRequest := &c.expectedRequests[0]
        c.expectedRequests = c.expectedRequests[1:]
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 23c3b61..3b5e622 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -45,7 +45,7 @@ type RPCClient interface {
        RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type, 
message proto.Message) (*RPCResult, error)
 
        Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
-               cmdType pb.BaseCommand_Type, message proto.Message) 
(*RPCResult, error)
+               cmdType pb.BaseCommand_Type, message proto.Message, 
connectingThroughProxy bool) (*RPCResult, error)
 
        RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message 
proto.Message)
 
@@ -71,13 +71,13 @@ func NewRPCClient(serviceURL *url.URL, pool ConnectionPool, 
requestTimeout time.
 
 func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType 
pb.BaseCommand_Type,
        message proto.Message) (*RPCResult, error) {
-       return c.Request(c.serviceURL, c.serviceURL, requestID, cmdType, 
message)
+       return c.Request(c.serviceURL, c.serviceURL, requestID, cmdType, 
message, false)
 }
 
 func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, 
requestID uint64,
-       cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) 
{
+       cmdType pb.BaseCommand_Type, message proto.Message, 
connectingThroughProxy bool) (*RPCResult, error) {
        // TODO: Add retry logic in case of connection issues
-       cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
+       cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr, 
connectingThroughProxy)
        if err != nil {
                return nil, err
        }
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index f96a706..0963570 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -140,8 +140,8 @@ func (p *partitionProducer) grabCnx() error {
        if len(p.options.Properties) > 0 {
                cmdProducer.Metadata = toKeyValues(p.options.Properties)
        }
-
-       res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, 
id, pb.BaseCommand_PRODUCER, cmdProducer)
+       res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, 
id, pb.BaseCommand_PRODUCER, cmdProducer,
+               lr.ConnectingThroughProxy)
        if err != nil {
                p.log.WithError(err).Error("Failed to create producer")
                return err

Reply via email to