This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new c0cba32  Avoid producer deadlock on connection closing (#337)
c0cba32 is described below

commit c0cba320e933fad15469e7f51219c2d0f0d96dcf
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Tue Jul 28 21:50:24 2020 -0700

    Avoid producer deadlock on connection closing (#337)
    
    * Avoid producer deadlock on connection closing
    
    * Fixed constants init
    
    * Avoid creating timer instance each time, if channel is not full
    
    * Added debug statements
---
 pulsar/internal/connection.go | 47 ++++++++++++++++++++++++++++++++++---------
 1 file changed, 37 insertions(+), 10 deletions(-)

diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 4be9ba2..0e13380 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -107,14 +107,14 @@ type ConsumerHandler interface {
        ConnectionClosed()
 }
 
-type connectionState int
+type connectionState int32
 
 const (
-       connectionInit connectionState = iota
-       connectionConnecting
-       connectionTCPConnected
-       connectionReady
-       connectionClosed
+       connectionInit         = 0
+       connectionConnecting   = 1
+       connectionTCPConnected = 2
+       connectionReady        = 3
+       connectionClosed       = 4
 )
 
 func (s connectionState) String() string {
@@ -150,7 +150,7 @@ type incomingCmd struct {
 type connection struct {
        sync.Mutex
        cond              *sync.Cond
-       state             connectionState
+       state             int32
        connectionTimeout time.Duration
 
        logicalAddr  *url.URL
@@ -190,7 +190,7 @@ type connection struct {
 func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions 
*TLSOptions,
        connectionTimeout time.Duration, auth auth.Provider) *connection {
        cnx := &connection{
-               state:                connectionInit,
+               state:                int32(connectionInit),
                connectionTimeout:    connectionTimeout,
                logicalAddr:          logicalAddr,
                physicalAddr:         physicalAddr,
@@ -397,7 +397,34 @@ func (c *connection) runPingCheck() {
 }
 
 func (c *connection) WriteData(data Buffer) {
-       c.writeRequestsCh <- data
+       select {
+       case c.writeRequestsCh <- data:
+               // Channel is not full
+               return
+
+       default:
+               // Channel full, fallback to probe if connection is closed
+       }
+
+       for {
+               select {
+               case c.writeRequestsCh <- data:
+                       // Successfully wrote on the channel
+                       return
+
+               case <-time.After(100 * time.Millisecond):
+                       // The channel is either:
+                       // 1. blocked, in which case we need to wait until we 
have space
+                       // 2. the connection is already closed, then we need to 
bail out
+                       c.log.Debug("Couldn't write on connection channel 
immediately")
+                       state := connectionState(atomic.LoadInt32(&c.state))
+                       if state != connectionReady {
+                               c.log.Debug("Connection was already closed")
+                               return
+                       }
+               }
+       }
+
 }
 
 func (c *connection) internalWriteData(data Buffer) {
@@ -729,7 +756,7 @@ func (c *connection) Close() {
 
 func (c *connection) changeState(state connectionState) {
        c.Lock()
-       c.state = state
+       atomic.StoreInt32(&c.state, int32(state))
        c.cond.Broadcast()
        c.Unlock()
 }

Reply via email to