This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new f42fdde1 [Improve] pool sendRequest to improve producer perf (#1126)
f42fdde1 is described below
commit f42fdde157ce7d5a0d329e23b5b22e97044e4127
Author: gunli <[email protected]>
AuthorDate: Fri Nov 10 18:18:43 2023 +0800
[Improve] pool sendRequest to improve producer perf (#1126)
### Motivation
`sendRequest ` in producer is a frequently allocated struct, pool it can
decrease the memory allocation.
### Modifications
1. Init a sync.Pool;
2. Get sendRequest from the pool when we need;
3. Reset sendRequest and put it back into the pool when it is done.
---------
Co-authored-by: gunli <[email protected]>
---
pulsar/producer_partition.go | 27 ++++++++++++++++++++++++---
1 file changed, 24 insertions(+), 3 deletions(-)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 8608df3d..9c7c1c09 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -60,11 +60,20 @@ var (
errProducerClosed = newError(ProducerClosed, "producer already been
closed")
errMemoryBufferIsFull = newError(ClientMemoryBufferIsFull, "client
memory buffer is full")
- buffersPool sync.Pool
+ buffersPool sync.Pool
+ sendRequestPool *sync.Pool
)
var errTopicNotFount = "TopicNotFound"
+func init() {
+ sendRequestPool = &sync.Pool{
+ New: func() interface{} {
+ return &sendRequest{}
+ },
+ }
+}
+
type partitionProducer struct {
state uAtomic.Int32
client *client
@@ -527,7 +536,9 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {
}
// update chunk id
sr.mm.ChunkId = proto.Int32(int32(chunkID))
- nsr := &sendRequest{
+ nsr := sendRequestPool.Get().(*sendRequest)
+ *nsr = sendRequest{
+ pool: sendRequestPool,
ctx: sr.ctx,
msg: sr.msg,
producer: sr.producer,
@@ -1150,7 +1161,9 @@ func (p *partitionProducer) internalSendAsync(
return
}
- sr := &sendRequest{
+ sr := sendRequestPool.Get().(*sendRequest)
+ *sr = sendRequest{
+ pool: sendRequestPool,
ctx: ctx,
msg: msg,
producer: p,
@@ -1395,6 +1408,7 @@ func (p *partitionProducer) Close() {
}
type sendRequest struct {
+ pool *sync.Pool
ctx context.Context
msg *ProducerMessage
producer *partitionProducer
@@ -1477,6 +1491,13 @@ func (sr *sendRequest) done(msgID MessageID, err error) {
sr.transaction.endSendOrAckOp(err)
}
}
+
+ pool := sr.pool
+ if pool != nil {
+ // reset all the fields
+ *sr = sendRequest{}
+ pool.Put(sr)
+ }
}
func (p *partitionProducer) blockIfQueueFull() bool {