This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new c187548 fix leaked nack tracker goroutine (#418) c187548 is described below commit c1875485e1adee892fd16374c1e8d21a1a31f4dd Author: Ming <itestmyc...@gmail.com> AuthorDate: Wed Dec 16 17:05:59 2020 -0500 fix leaked nack tracker goroutine (#418) * fix leaked nack tracker goroutine * update testcase to ensure no panic --- pulsar/consumer_partition.go | 7 +++++++ pulsar/negative_acks_tracker.go | 12 +++++++++--- pulsar/negative_acks_tracker_test.go | 2 ++ 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 285cf29..d970f65 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -159,6 +159,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon err := pc.grabConn() if err != nil { pc.log.WithError(err).Error("Failed to create consumer") + pc.nackTracker.Close() return nil, err } pc.log.Info("Created consumer") @@ -167,6 +168,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon if pc.options.startMessageIDInclusive && pc.startMessageID == lastestMessageID { msgID, err := pc.requestGetLastMessageID() if err != nil { + pc.nackTracker.Close() return nil, err } if msgID.entryID != noMessageEntry { @@ -174,6 +176,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon err = pc.requestSeek(msgID.messageID) if err != nil { + pc.nackTracker.Close() return nil, err } } @@ -739,6 +742,10 @@ func (pc *partitionConsumer) runEventsLoop() { func (pc *partitionConsumer) internalClose(req *closeRequest) { defer close(req.doneCh) if pc.state != consumerReady { + // this might be redundant but to ensure nack tracker is closed + if pc.nackTracker != nil { + pc.nackTracker.Close() + } return } diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go index 23a8a91..e10ab49 100644 --- a/pulsar/negative_acks_tracker.go +++ b/pulsar/negative_acks_tracker.go @@ -32,6 +32,7 @@ type negativeAcksTracker struct { sync.Mutex doneCh chan interface{} + doneOnce sync.Once negativeAcks map[messageID]time.Time rc redeliveryConsumer tick *time.Ticker @@ -84,10 +85,11 @@ func (t *negativeAcksTracker) track() { case <-t.tick.C: { - t.Lock() - now := time.Now() msgIds := make([]messageID, 0) + + t.Lock() + for msgID, targetTime := range t.negativeAcks { t.log.Debugf("MsgId: %v -- targetTime: %v -- now: %v", msgID, targetTime, now) if targetTime.Before(now) { @@ -109,5 +111,9 @@ func (t *negativeAcksTracker) track() { } func (t *negativeAcksTracker) Close() { - t.doneCh <- nil + // allow Close() to be invoked multiple times by consumer_partition to avoid panic + t.doneOnce.Do(func() { + t.tick.Stop() + t.doneCh <- nil + }) } diff --git a/pulsar/negative_acks_tracker_test.go b/pulsar/negative_acks_tracker_test.go index 27e07d8..e587f3f 100644 --- a/pulsar/negative_acks_tracker_test.go +++ b/pulsar/negative_acks_tracker_test.go @@ -102,6 +102,8 @@ func TestNacksTracker(t *testing.T) { assert.Equal(t, int64(2), msgIds[1].entryID) nacks.Close() + // allow multiple Close without panicing + nacks.Close() } func TestNacksWithBatchesTracker(t *testing.T) {