gunli commented on code in PR #1333:
URL: https://github.com/apache/pulsar-client-go/pull/1333#discussion_r1965280262
##########
pulsar/producer_partition.go:
##########
@@ -581,7 +581,8 @@ func (p *partitionProducer) runEventsLoop() {
}
case connectionClosed := <-p.connectClosedCh:
p.log.Info("runEventsLoop will reconnect in producer")
- p.reconnectToBroker(connectionClosed)
+ // reconnect to broker in a new goroutine so that it
won't block the event loop, see issue #1332
+ go p.reconnectToBroker(connectionClosed)
Review Comment:
> > I think the `connection.State` is sufficient and timely enough to
control this, because the reconnection state of a producer relies on
`connection.State` and the notification from a `connection`. If the connection
is closed, we should stop sending.
>
> A connection has multiple producers and consumers, please consider one
case the connection is active, the producer is inactive, you should not send
the message to the broker.
>
>
https://github.com/apache/pulsar/blob/v4.0.2/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2366-L2382,
this is java implementation.
I looked into the java code, but I still can't understand it as the JAVA
implemention encapsulate too much details and I know little about JAVA.
You said that `please consider one case the connection is active, the
producer is inactive`, as I know, only the connection's close event can trigger
a produce from active to inactive.
Regardless, the current implemention of the conneciton in pulsar-go-client
is problematic, `c.internalSendRequest(req)` and
`c.internalWriteData(data)/c.sendPing()` can happen at the same time as the are
run in 2 goroutines, they will call `c.cnx.Write()` finally, which will cause
write/data conflict, IMO, this can be considered as a BUG, but we can fix it in
another PR.
And, the conneciton use `incomingRequestsCh` with capacity of 10
(`incomingRequestsCh: make(chan *request, 10)`) to receive and hold user's
data, this can be considered as another `pendingQueue`, now we have 2
`pendingQueue`s, one in the producer, think aboute a case: a user's data/buffer
is sent into `producer.pendingQueue` and then sent into
`connection.incomingRequestsCh`, but `c.cnx.Write()` is not called yet, now
there can be a problem: If the timeout config is short, now, the reqeust is
treated as `timeout` by `producer.failTimeoutMessages()`, then the buffer is
put back into the pool, but `connection.incomingRequestsCh` still keeps a
reference of the buffer, if the buffer is realloced by the pool, now, if the
connection run to `c.cnx.Write()`, there will be data race. This can be
considered another BUG,t oo, a new PR is needed again.
```go
go func() {
for {
select {
case <-c.closeCh:
c.failLeftRequestsWhenClose()
return
case req := <-c.incomingRequestsCh:
if req == nil {
return // TODO: this never gonna be
happen
}
c.internalSendRequest(req)
}
}
}()
for {
select {
case <-c.closeCh:
return
case cmd := <-c.incomingCmdCh:
c.internalReceivedCommand(cmd.cmd,
cmd.headersAndPayload)
case data := <-c.writeRequestsCh:
if data == nil {
return
}
c.internalWriteData(data)
case <-pingSendTicker.C:
c.sendPing()
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]