This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 98338e5 fix resend pendingItems race condition (#551) 98338e5 is described below commit 98338e544d31570504cc06f4da63da47cbb8e628 Author: Rui Fu <freez...@users.noreply.github.com> AuthorDate: Thu Jun 24 16:36:49 2021 +0800 fix resend pendingItems race condition (#551) --- pulsar/producer_partition.go | 42 +++++++++++++++++++++++++++++++++--------- 1 file changed, 33 insertions(+), 9 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index b365f83..19aa06e 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -231,10 +231,29 @@ func (p *partitionProducer) grabCnx() error { p.log.WithField("cnx", res.Cnx.ID()).Debug("Connected producer") pendingItems := p.pendingQueue.ReadableSlice() - if len(pendingItems) > 0 { - p.log.Infof("Resending %d pending batches", len(pendingItems)) - for _, pi := range pendingItems { - p.cnx.WriteData(pi.(*pendingItem).batchData) + viewSize := len(pendingItems) + if viewSize > 0 { + p.log.Infof("Resending %d pending batches", viewSize) + lastViewItem := pendingItems[viewSize-1].(*pendingItem) + + // iterate at most pending items + for i := 0; i < viewSize; i++ { + item := p.pendingQueue.Poll() + if item == nil { + continue + } + pi := item.(*pendingItem) + // when resending pending batches, we update the sendAt timestamp and put to the back of queue + // to avoid pending item been removed by failTimeoutMessages and cause race condition + pi.Lock() + pi.sentAt = time.Now() + pi.Unlock() + p.pendingQueue.Put(pi) + p.cnx.WriteData(pi.batchData) + + if pi == lastViewItem { + break + } } } return nil @@ -523,8 +542,7 @@ func (p *partitionProducer) failTimeoutMessages() { } // flag the send has completed with error, flush make no effect - pi.completed = true - buffersPool.Put(pi.batchData) + pi.Complete() pi.Unlock() // finally reached the last view item, current iteration ends @@ -706,9 +724,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) } // Mark this pending item as done - pi.completed = true - // Return buffer to the pool since we're now done using it - buffersPool.Put(pi.batchData) + pi.Complete() } func (p *partitionProducer) internalClose(req *closeProducer) { @@ -800,3 +816,11 @@ type flushRequest struct { waitGroup *sync.WaitGroup err error } + +func (i *pendingItem) Complete() { + if i.completed { + return + } + i.completed = true + buffersPool.Put(i.batchData) +}