This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 81f3830  [Issue 644] Drain connection requests channel without closing 
(#645)
81f3830 is described below

commit 81f383018c50c05d5d98bef757e6df039e864034
Author: cckellogg <cckell...@gmail.com>
AuthorDate: Wed Oct 20 15:11:07 2021 -0400

    [Issue 644] Drain connection requests channel without closing (#645)
---
 pulsar/internal/connection.go | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)

diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 163dcac..57067ca 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -347,12 +347,21 @@ func (c *connection) failLeftRequestsWhenClose() {
        // and closing the channel
        c.incomingRequestsWG.Wait()
 
-       reqLen := len(c.incomingRequestsCh)
-       for i := 0; i < reqLen; i++ {
-               c.internalSendRequest(<-c.incomingRequestsCh)
+       ch := c.incomingRequestsCh
+       go func() {
+               // send a nil message to drain instead of
+               // closing the channel and causing a potential panic
+               //
+               // if other requests come in after the nil message
+               // then the RPC client will time out
+               ch <- nil
+       }()
+       for req := range ch {
+               if nil == req {
+                       break // we have drained the requests
+               }
+               c.internalSendRequest(req)
        }
-
-       close(c.incomingRequestsCh)
 }
 
 func (c *connection) run() {

Reply via email to