This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 81f3830 [Issue 644] Drain connection requests channel without closing (#645) 81f3830 is described below commit 81f383018c50c05d5d98bef757e6df039e864034 Author: cckellogg <cckell...@gmail.com> AuthorDate: Wed Oct 20 15:11:07 2021 -0400 [Issue 644] Drain connection requests channel without closing (#645) --- pulsar/internal/connection.go | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 163dcac..57067ca 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -347,12 +347,21 @@ func (c *connection) failLeftRequestsWhenClose() { // and closing the channel c.incomingRequestsWG.Wait() - reqLen := len(c.incomingRequestsCh) - for i := 0; i < reqLen; i++ { - c.internalSendRequest(<-c.incomingRequestsCh) + ch := c.incomingRequestsCh + go func() { + // send a nil message to drain instead of + // closing the channel and causing a potential panic + // + // if other requests come in after the nil message + // then the RPC client will time out + ch <- nil + }() + for req := range ch { + if nil == req { + break // we have drained the requests + } + c.internalSendRequest(req) } - - close(c.incomingRequestsCh) } func (c *connection) run() {