gunli commented on code in PR #1333:
URL: https://github.com/apache/pulsar-client-go/pull/1333#discussion_r1965280262


##########
pulsar/producer_partition.go:
##########
@@ -581,7 +581,8 @@ func (p *partitionProducer) runEventsLoop() {
                        }
                case connectionClosed := <-p.connectClosedCh:
                        p.log.Info("runEventsLoop will reconnect in producer")
-                       p.reconnectToBroker(connectionClosed)
+                       // reconnect to broker in a new goroutine so that it 
won't block the event loop, see issue #1332
+                       go p.reconnectToBroker(connectionClosed)

Review Comment:
   > > I think the `connection.State` is sufficient and timely enough to 
control this, because the reconnection state of a producer relies on 
`connection.State` and the notification from a `connection`. If the connection 
is closed, we should stop sending.
   > 
   > A connection has multiple producers and consumers, please consider one 
case the connection is active, the producer is inactive, you should not send 
the message to the broker.
   > 
   > 
https://github.com/apache/pulsar/blob/v4.0.2/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2366-L2382,
 this is java implementation.
   
   I looked into the java code, but I still can't understand it as the JAVA 
implemention encapsulate too much details and I know little about JAVA.
   
   You said that `please consider one case the connection is active, the 
producer is inactive`, as I know, only the connection's close event can trigger 
a produce from active to inactive.
   
   Regardless, the current implemention of the conneciton in pulsar-go-client 
is problematic, `c.internalSendRequest(req)` and 
`c.internalWriteData(data)/c.sendPing()` can happen at the same time as the are 
run in 2 goroutines, they will call `c.cnx.Write()` finally, which will cause 
write/data conflict, IMO, this can be considered as a BUG, but we can fix it in 
another PR.
   
   And, the conneciton use `incomingRequestsCh` with capacity of 10 
(`incomingRequestsCh: make(chan *request, 10)`) to receive and hold user's 
data, this can be considered as another `pendingQueue`, now we have 2 
`pendingQueue`s, one in the producer, think aboute a case: a user's data/buffer 
is sent into `producer.pendingQueue` and then sent into 
`connection.incomingRequestsCh`, but `c.cnx.Write()` is not called yet, now 
there can be a problem: If the timeout config is short, now, the reqeust is 
treated as `timeout` by `producer.failTimeoutMessages()`, then the buffer is 
put back into the pool, but `connection.incomingRequestsCh` still keeps a 
reference of the buffer, if the buffer is realloced by the pool, now, if the 
connection run to `c.cnx.Write()`, there will be data race. This can be 
considered another BUG,t oo, a new PR is needed again.
   
   ```go
   go func() {
                for {
                        select {
                        case <-c.closeCh:
                                c.failLeftRequestsWhenClose()
                                return
   
                        case req := <-c.incomingRequestsCh:
                                if req == nil {
                                        return // TODO: this never gonna be 
happen
                                }
                                c.internalSendRequest(req)
                        }
                }
        }()
   
        for {
                select {
                case <-c.closeCh:
                        return
   
                case cmd := <-c.incomingCmdCh:
                        c.internalReceivedCommand(cmd.cmd, 
cmd.headersAndPayload)
                case data := <-c.writeRequestsCh:
                        if data == nil {
                                return
                        }
                        c.internalWriteData(data)
   
                case <-pingSendTicker.C:
                        c.sendPing()
                }
        }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to