This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new c187548  fix leaked nack tracker goroutine (#418)
c187548 is described below

commit c1875485e1adee892fd16374c1e8d21a1a31f4dd
Author: Ming <itestmyc...@gmail.com>
AuthorDate: Wed Dec 16 17:05:59 2020 -0500

    fix leaked nack tracker goroutine (#418)
    
    * fix leaked nack tracker goroutine
    
    * update testcase to ensure no panic
---
 pulsar/consumer_partition.go         |  7 +++++++
 pulsar/negative_acks_tracker.go      | 12 +++++++++---
 pulsar/negative_acks_tracker_test.go |  2 ++
 3 files changed, 18 insertions(+), 3 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 285cf29..d970f65 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -159,6 +159,7 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
        err := pc.grabConn()
        if err != nil {
                pc.log.WithError(err).Error("Failed to create consumer")
+               pc.nackTracker.Close()
                return nil, err
        }
        pc.log.Info("Created consumer")
@@ -167,6 +168,7 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
        if pc.options.startMessageIDInclusive && pc.startMessageID == 
lastestMessageID {
                msgID, err := pc.requestGetLastMessageID()
                if err != nil {
+                       pc.nackTracker.Close()
                        return nil, err
                }
                if msgID.entryID != noMessageEntry {
@@ -174,6 +176,7 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
 
                        err = pc.requestSeek(msgID.messageID)
                        if err != nil {
+                               pc.nackTracker.Close()
                                return nil, err
                        }
                }
@@ -739,6 +742,10 @@ func (pc *partitionConsumer) runEventsLoop() {
 func (pc *partitionConsumer) internalClose(req *closeRequest) {
        defer close(req.doneCh)
        if pc.state != consumerReady {
+               // this might be redundant but to ensure nack tracker is closed
+               if pc.nackTracker != nil {
+                       pc.nackTracker.Close()
+               }
                return
        }
 
diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go
index 23a8a91..e10ab49 100644
--- a/pulsar/negative_acks_tracker.go
+++ b/pulsar/negative_acks_tracker.go
@@ -32,6 +32,7 @@ type negativeAcksTracker struct {
        sync.Mutex
 
        doneCh       chan interface{}
+       doneOnce     sync.Once
        negativeAcks map[messageID]time.Time
        rc           redeliveryConsumer
        tick         *time.Ticker
@@ -84,10 +85,11 @@ func (t *negativeAcksTracker) track() {
 
                case <-t.tick.C:
                        {
-                               t.Lock()
-
                                now := time.Now()
                                msgIds := make([]messageID, 0)
+
+                               t.Lock()
+
                                for msgID, targetTime := range t.negativeAcks {
                                        t.log.Debugf("MsgId: %v -- targetTime: 
%v -- now: %v", msgID, targetTime, now)
                                        if targetTime.Before(now) {
@@ -109,5 +111,9 @@ func (t *negativeAcksTracker) track() {
 }
 
 func (t *negativeAcksTracker) Close() {
-       t.doneCh <- nil
+       // allow Close() to be invoked multiple times by consumer_partition to 
avoid panic
+       t.doneOnce.Do(func() {
+               t.tick.Stop()
+               t.doneCh <- nil
+       })
 }
diff --git a/pulsar/negative_acks_tracker_test.go 
b/pulsar/negative_acks_tracker_test.go
index 27e07d8..e587f3f 100644
--- a/pulsar/negative_acks_tracker_test.go
+++ b/pulsar/negative_acks_tracker_test.go
@@ -102,6 +102,8 @@ func TestNacksTracker(t *testing.T) {
        assert.Equal(t, int64(2), msgIds[1].entryID)
 
        nacks.Close()
+       // allow multiple Close without panicing
+       nacks.Close()
 }
 
 func TestNacksWithBatchesTracker(t *testing.T) {

Reply via email to