gunli commented on code in PR #1333:
URL: https://github.com/apache/pulsar-client-go/pull/1333#discussion_r1960834782
##########
pulsar/producer_partition.go:
##########
@@ -581,7 +581,8 @@ func (p *partitionProducer) runEventsLoop() {
}
case connectionClosed := <-p.connectClosedCh:
p.log.Info("runEventsLoop will reconnect in producer")
- p.reconnectToBroker(connectionClosed)
+ // reconnect to broker in a new goroutine so that it
won't block the event loop, see issue #1332
+ go p.reconnectToBroker(connectionClosed)
Review Comment:
> Even with the check below, we can't be 100% sure of using the old
connection, because it's inherently concurrent.
>
> ```
> conn := p._getConn()
> if conn.Closed() {
> return
> }
> ```
I agree with that. Actually, I think the pulsar-client-go is not well
designed. It is quite strange to pass a buffer to a connection's channel when
sending a message, as this causes the buffer to be read and written by two
different goroutines, leading to a data race. What's worse is that if a buffer
is sent into a connection's channel and the connection is closed at the same
time, the buffer ends up in an new uncertain or pending state, we need to pay
more attention to handling this situation again, currently, this situation is
not handled when the connection is closed by network or by server notification
pb.BaseCommand_CLOSE_PRODUCER/pb.BaseCommand_CLOSE_CONSUMER, may be we need a
new PR to handle this.
In my opinion, it would be better to keep the message in the pending queue
and use timeout events and server ack events to determine whether a message has
timed out or succeeded.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]