This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 9cbe36f Share buffer pool across all partitions (#310) 9cbe36f is described below commit 9cbe36f0e8d9ee3547bd2a38eee6efa3b2ad04c5 Author: Matteo Merli <mme...@apache.org> AuthorDate: Wed Jul 8 19:30:31 2020 -0700 Share buffer pool across all partitions (#310) ## Motivation When a producer is publishing on many partitions, there can be significant memory overhead in maintaining a per-partition pool. Instead, there's not significant perf impact in using a single shared buffer pool. --- perf/perf-producer.go | 4 ++++ pulsar/internal/batch_builder.go | 3 +++ pulsar/producer_partition.go | 32 +++++++++++++++----------------- 3 files changed, 22 insertions(+), 17 deletions(-) diff --git a/perf/perf-producer.go b/perf/perf-producer.go index 0085c07..ba6e197 100644 --- a/perf/perf-producer.go +++ b/perf/perf-producer.go @@ -36,6 +36,7 @@ type ProduceArgs struct { Topic string Rate int BatchingTimeMillis int + BatchingMaxSize int MessageSize int ProducerQueueSize int } @@ -62,6 +63,8 @@ func newProducerCommand() *cobra.Command { "Publish rate. Set to 0 to go unthrottled") flags.IntVarP(&produceArgs.BatchingTimeMillis, "batching-time", "b", 1, "Batching grouping time in millis") + flags.IntVarP(&produceArgs.BatchingMaxSize, "batching-max-size", "", 128, + "Max size of a batch (in KB)") flags.IntVarP(&produceArgs.MessageSize, "size", "s", 1024, "Message size") flags.IntVarP(&produceArgs.ProducerQueueSize, "queue-size", "q", 1000, @@ -86,6 +89,7 @@ func produce(produceArgs *ProduceArgs, stop <-chan struct{}) { Topic: produceArgs.Topic, MaxPendingMessages: produceArgs.ProducerQueueSize, BatchingMaxPublishDelay: time.Millisecond * time.Duration(produceArgs.BatchingTimeMillis), + BatchingMaxSize: uint(produceArgs.BatchingMaxSize * 1024), }) if err != nil { log.Fatal(err) diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go index a88bada..18ec3c4 100644 --- a/pulsar/internal/batch_builder.go +++ b/pulsar/internal/batch_builder.go @@ -170,6 +170,9 @@ func (bb *BatchBuilder) Flush() (batchData Buffer, sequenceID uint64, callbacks bb.msgMetadata.UncompressedSize = &uncompressedSize buffer := bb.buffersPool.GetBuffer() + if buffer == nil { + buffer = NewBuffer(int(uncompressedSize * 3 / 2)) + } serializeBatch(buffer, bb.cmdSend, bb.msgMetadata, bb.buffer, bb.compressionProvider) callbacks = bb.callbacks diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 239fcd6..9243804 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -45,6 +45,8 @@ const ( var ( errFailAddBatch = errors.New("message send failed") errMessageTooLarge = errors.New("message size exceeds MaxMessageSize") + + buffersPool sync.Pool ) type partitionProducer struct { @@ -62,8 +64,7 @@ type partitionProducer struct { batchFlushTicker *time.Ticker // Channel where app is posting messages to be published - eventsChan chan interface{} - buffersPool sync.Pool + eventsChan chan interface{} publishSemaphore internal.Semaphore pendingQueue internal.BlockingQueue @@ -89,18 +90,13 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions } p := &partitionProducer{ - state: producerInit, - log: log.WithField("topic", topic), - client: client, - topic: topic, - options: options, - producerID: client.rpcClient.NewProducerID(), - eventsChan: make(chan interface{}, maxPendingMessages), - buffersPool: sync.Pool{ - New: func() interface{} { - return internal.NewBuffer(1024) - }, - }, + state: producerInit, + log: log.WithField("topic", topic), + client: client, + topic: topic, + options: options, + producerID: client.rpcClient.NewProducerID(), + eventsChan: make(chan interface{}, maxPendingMessages), batchFlushTicker: time.NewTicker(batchingMaxPublishDelay), publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)), pendingQueue: internal.NewBlockingQueue(maxPendingMessages), @@ -189,8 +185,10 @@ func (p *partitionProducer) grabCnx() error { type connectionClosed struct{} func (p *partitionProducer) GetBuffer() internal.Buffer { - b := p.buffersPool.Get().(internal.Buffer) - b.Clear() + b, ok := buffersPool.Get().(internal.Buffer) + if ok { + b.Clear() + } return b } @@ -452,7 +450,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) // Mark this pending item as done pi.completed = true // Return buffer to the pool since we're now done using it - p.buffersPool.Put(pi.batchData) + buffersPool.Put(pi.batchData) } func (p *partitionProducer) internalClose(req *closeProducer) {