This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new f42fdde1 [Improve] pool sendRequest to improve producer perf (#1126)
f42fdde1 is described below

commit f42fdde157ce7d5a0d329e23b5b22e97044e4127
Author: gunli <[email protected]>
AuthorDate: Fri Nov 10 18:18:43 2023 +0800

    [Improve] pool sendRequest to improve producer perf (#1126)
    
    ### Motivation
    
    `sendRequest ` in producer is a frequently allocated struct, pool it can 
decrease the memory allocation.
    
    ### Modifications
    
    1. Init a sync.Pool;
    2. Get sendRequest  from the pool when we need;
    3. Reset sendRequest and put it back into the pool when it is done.
    ---------
    
    Co-authored-by: gunli <[email protected]>
---
 pulsar/producer_partition.go | 27 ++++++++++++++++++++++++---
 1 file changed, 24 insertions(+), 3 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 8608df3d..9c7c1c09 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -60,11 +60,20 @@ var (
        errProducerClosed     = newError(ProducerClosed, "producer already been 
closed")
        errMemoryBufferIsFull = newError(ClientMemoryBufferIsFull, "client 
memory buffer is full")
 
-       buffersPool sync.Pool
+       buffersPool     sync.Pool
+       sendRequestPool *sync.Pool
 )
 
 var errTopicNotFount = "TopicNotFound"
 
+func init() {
+       sendRequestPool = &sync.Pool{
+               New: func() interface{} {
+                       return &sendRequest{}
+               },
+       }
+}
+
 type partitionProducer struct {
        state  uAtomic.Int32
        client *client
@@ -527,7 +536,9 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {
                }
                // update chunk id
                sr.mm.ChunkId = proto.Int32(int32(chunkID))
-               nsr := &sendRequest{
+               nsr := sendRequestPool.Get().(*sendRequest)
+               *nsr = sendRequest{
+                       pool:                sendRequestPool,
                        ctx:                 sr.ctx,
                        msg:                 sr.msg,
                        producer:            sr.producer,
@@ -1150,7 +1161,9 @@ func (p *partitionProducer) internalSendAsync(
                return
        }
 
-       sr := &sendRequest{
+       sr := sendRequestPool.Get().(*sendRequest)
+       *sr = sendRequest{
+               pool:             sendRequestPool,
                ctx:              ctx,
                msg:              msg,
                producer:         p,
@@ -1395,6 +1408,7 @@ func (p *partitionProducer) Close() {
 }
 
 type sendRequest struct {
+       pool             *sync.Pool
        ctx              context.Context
        msg              *ProducerMessage
        producer         *partitionProducer
@@ -1477,6 +1491,13 @@ func (sr *sendRequest) done(msgID MessageID, err error) {
                        sr.transaction.endSendOrAckOp(err)
                }
        }
+
+       pool := sr.pool
+       if pool != nil {
+               // reset all the fields
+               *sr = sendRequest{}
+               pool.Put(sr)
+       }
 }
 
 func (p *partitionProducer) blockIfQueueFull() bool {

Reply via email to