This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 98338e5  fix resend pendingItems race condition (#551)
98338e5 is described below

commit 98338e544d31570504cc06f4da63da47cbb8e628
Author: Rui Fu <freez...@users.noreply.github.com>
AuthorDate: Thu Jun 24 16:36:49 2021 +0800

    fix resend pendingItems race condition (#551)
---
 pulsar/producer_partition.go | 42 +++++++++++++++++++++++++++++++++---------
 1 file changed, 33 insertions(+), 9 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index b365f83..19aa06e 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -231,10 +231,29 @@ func (p *partitionProducer) grabCnx() error {
        p.log.WithField("cnx", res.Cnx.ID()).Debug("Connected producer")
 
        pendingItems := p.pendingQueue.ReadableSlice()
-       if len(pendingItems) > 0 {
-               p.log.Infof("Resending %d pending batches", len(pendingItems))
-               for _, pi := range pendingItems {
-                       p.cnx.WriteData(pi.(*pendingItem).batchData)
+       viewSize := len(pendingItems)
+       if viewSize > 0 {
+               p.log.Infof("Resending %d pending batches", viewSize)
+               lastViewItem := pendingItems[viewSize-1].(*pendingItem)
+
+               // iterate at most pending items
+               for i := 0; i < viewSize; i++ {
+                       item := p.pendingQueue.Poll()
+                       if item == nil {
+                               continue
+                       }
+                       pi := item.(*pendingItem)
+                       // when resending pending batches, we update the sendAt 
timestamp and put to the back of queue
+                       // to avoid pending item been removed by 
failTimeoutMessages and cause race condition
+                       pi.Lock()
+                       pi.sentAt = time.Now()
+                       pi.Unlock()
+                       p.pendingQueue.Put(pi)
+                       p.cnx.WriteData(pi.batchData)
+
+                       if pi == lastViewItem {
+                               break
+                       }
                }
        }
        return nil
@@ -523,8 +542,7 @@ func (p *partitionProducer) failTimeoutMessages() {
                        }
 
                        // flag the send has completed with error, flush make 
no effect
-                       pi.completed = true
-                       buffersPool.Put(pi.batchData)
+                       pi.Complete()
                        pi.Unlock()
 
                        // finally reached the last view item, current 
iteration ends
@@ -706,9 +724,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response 
*pb.CommandSendReceipt)
        }
 
        // Mark this pending item as done
-       pi.completed = true
-       // Return buffer to the pool since we're now done using it
-       buffersPool.Put(pi.batchData)
+       pi.Complete()
 }
 
 func (p *partitionProducer) internalClose(req *closeProducer) {
@@ -800,3 +816,11 @@ type flushRequest struct {
        waitGroup *sync.WaitGroup
        err       error
 }
+
+func (i *pendingItem) Complete() {
+       if i.completed {
+               return
+       }
+       i.completed = true
+       buffersPool.Put(i.batchData)
+}

Reply via email to