This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cbe36f  Share buffer pool across all partitions (#310)
9cbe36f is described below

commit 9cbe36f0e8d9ee3547bd2a38eee6efa3b2ad04c5
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Wed Jul 8 19:30:31 2020 -0700

    Share buffer pool across all partitions (#310)
    
    ## Motivation
    
    When a producer is publishing on many partitions, there can be significant 
memory overhead in maintaining a per-partition pool. Instead, there's not 
significant perf impact in using a single shared buffer pool.
---
 perf/perf-producer.go            |  4 ++++
 pulsar/internal/batch_builder.go |  3 +++
 pulsar/producer_partition.go     | 32 +++++++++++++++-----------------
 3 files changed, 22 insertions(+), 17 deletions(-)

diff --git a/perf/perf-producer.go b/perf/perf-producer.go
index 0085c07..ba6e197 100644
--- a/perf/perf-producer.go
+++ b/perf/perf-producer.go
@@ -36,6 +36,7 @@ type ProduceArgs struct {
        Topic              string
        Rate               int
        BatchingTimeMillis int
+       BatchingMaxSize    int
        MessageSize        int
        ProducerQueueSize  int
 }
@@ -62,6 +63,8 @@ func newProducerCommand() *cobra.Command {
                "Publish rate. Set to 0 to go unthrottled")
        flags.IntVarP(&produceArgs.BatchingTimeMillis, "batching-time", "b", 1,
                "Batching grouping time in millis")
+       flags.IntVarP(&produceArgs.BatchingMaxSize, "batching-max-size", "", 
128,
+               "Max size of a batch (in KB)")
        flags.IntVarP(&produceArgs.MessageSize, "size", "s", 1024,
                "Message size")
        flags.IntVarP(&produceArgs.ProducerQueueSize, "queue-size", "q", 1000,
@@ -86,6 +89,7 @@ func produce(produceArgs *ProduceArgs, stop <-chan struct{}) {
                Topic:                   produceArgs.Topic,
                MaxPendingMessages:      produceArgs.ProducerQueueSize,
                BatchingMaxPublishDelay: time.Millisecond * 
time.Duration(produceArgs.BatchingTimeMillis),
+               BatchingMaxSize:         uint(produceArgs.BatchingMaxSize * 
1024),
        })
        if err != nil {
                log.Fatal(err)
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index a88bada..18ec3c4 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -170,6 +170,9 @@ func (bb *BatchBuilder) Flush() (batchData Buffer, 
sequenceID uint64, callbacks
        bb.msgMetadata.UncompressedSize = &uncompressedSize
 
        buffer := bb.buffersPool.GetBuffer()
+       if buffer == nil {
+               buffer = NewBuffer(int(uncompressedSize * 3 / 2))
+       }
        serializeBatch(buffer, bb.cmdSend, bb.msgMetadata, bb.buffer, 
bb.compressionProvider)
 
        callbacks = bb.callbacks
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 239fcd6..9243804 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -45,6 +45,8 @@ const (
 var (
        errFailAddBatch    = errors.New("message send failed")
        errMessageTooLarge = errors.New("message size exceeds MaxMessageSize")
+
+       buffersPool sync.Pool
 )
 
 type partitionProducer struct {
@@ -62,8 +64,7 @@ type partitionProducer struct {
        batchFlushTicker    *time.Ticker
 
        // Channel where app is posting messages to be published
-       eventsChan  chan interface{}
-       buffersPool sync.Pool
+       eventsChan chan interface{}
 
        publishSemaphore internal.Semaphore
        pendingQueue     internal.BlockingQueue
@@ -89,18 +90,13 @@ func newPartitionProducer(client *client, topic string, 
options *ProducerOptions
        }
 
        p := &partitionProducer{
-               state:      producerInit,
-               log:        log.WithField("topic", topic),
-               client:     client,
-               topic:      topic,
-               options:    options,
-               producerID: client.rpcClient.NewProducerID(),
-               eventsChan: make(chan interface{}, maxPendingMessages),
-               buffersPool: sync.Pool{
-                       New: func() interface{} {
-                               return internal.NewBuffer(1024)
-                       },
-               },
+               state:            producerInit,
+               log:              log.WithField("topic", topic),
+               client:           client,
+               topic:            topic,
+               options:          options,
+               producerID:       client.rpcClient.NewProducerID(),
+               eventsChan:       make(chan interface{}, maxPendingMessages),
                batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
                publishSemaphore: 
internal.NewSemaphore(int32(maxPendingMessages)),
                pendingQueue:     internal.NewBlockingQueue(maxPendingMessages),
@@ -189,8 +185,10 @@ func (p *partitionProducer) grabCnx() error {
 type connectionClosed struct{}
 
 func (p *partitionProducer) GetBuffer() internal.Buffer {
-       b := p.buffersPool.Get().(internal.Buffer)
-       b.Clear()
+       b, ok := buffersPool.Get().(internal.Buffer)
+       if ok {
+               b.Clear()
+       }
        return b
 }
 
@@ -452,7 +450,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response 
*pb.CommandSendReceipt)
        // Mark this pending item as done
        pi.completed = true
        // Return buffer to the pool since we're now done using it
-       p.buffersPool.Put(pi.batchData)
+       buffersPool.Put(pi.batchData)
 }
 
 func (p *partitionProducer) internalClose(req *closeProducer) {

Reply via email to