This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new f9969ca8 feat: add sendRequest.done() to release resource together (#1110) f9969ca8 is described below commit f9969ca84e927786ef208abb73de065eb6bfacae Author: gunli <24350...@qq.com> AuthorDate: Tue Oct 24 16:42:31 2023 +0800 feat: add sendRequest.done() to release resource together (#1110) Co-authored-by: gunli <gu...@tencent.com> --- pulsar/producer_partition.go | 93 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 79 insertions(+), 14 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 82123f98..2beb3614 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1419,21 +1419,37 @@ func (p *partitionProducer) Close() { <-cp.doneCh } +//nolint:all type sendRequest struct { - ctx context.Context - msg *ProducerMessage - callback func(MessageID, *ProducerMessage, error) - callbackOnce *sync.Once - publishTime time.Time - flushImmediately bool - blockCh chan struct{} - closeBlockChOnce *sync.Once - totalChunks int - chunkID int - uuid string - chunkRecorder *chunkRecorder - transaction *transaction - reservedMem int64 + ctx context.Context + msg *ProducerMessage + callback func(MessageID, *ProducerMessage, error) + callbackOnce *sync.Once + publishTime time.Time + flushImmediately bool + blockCh chan struct{} + closeBlockChOnce *sync.Once + totalChunks int + chunkID int + uuid string + chunkRecorder *chunkRecorder + transaction *transaction + reservedMem int64 + producer *partitionProducer + memLimit internal.MemoryLimitController + semaphore internal.Semaphore + reservedSemaphore int + sendAsBatch bool + schema Schema + schemaVersion []byte + uncompressedPayload []byte + uncompressedSize int64 + compressedPayload []byte + compressedSize int + payloadChunkSize int + mm *pb.MessageMetadata + deliverAt time.Time + maxMessageSize int32 } // stopBlock can be invoked multiple times safety @@ -1443,6 +1459,55 @@ func (sr *sendRequest) stopBlock() { }) } +//nolint:all +func (sr *sendRequest) done(msgID MessageID, err error) { + if err == nil { + sr.producer.metrics.PublishLatency.Observe(float64(time.Now().UnixNano()-sr.publishTime.UnixNano()) / 1.0e9) + sr.producer.metrics.MessagesPublished.Inc() + sr.producer.metrics.BytesPublished.Add(float64(sr.reservedMem)) + } + + if err != nil { + sr.producer.log.WithError(err). + WithField("size", sr.reservedMem). + WithField("properties", sr.msg.Properties) + } + + if err == errSendTimeout { + sr.producer.metrics.PublishErrorsTimeout.Inc() + } + + if err == errMessageTooLarge { + sr.producer.metrics.PublishErrorsMsgTooLarge.Inc() + } + + if sr.semaphore != nil { + for i := 0; i < sr.reservedSemaphore; i++ { + sr.semaphore.Release() + } + sr.producer.metrics.MessagesPending.Sub(float64(sr.reservedSemaphore)) + } + + if sr.memLimit != nil { + sr.memLimit.ReleaseMemory(sr.reservedMem) + sr.producer.metrics.BytesPending.Sub(float64(sr.reservedMem)) + } + + if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 { + sr.callbackOnce.Do(func() { + runCallback(sr.callback, msgID, sr.msg, err) + }) + + if sr.transaction != nil { + sr.transaction.endSendOrAckOp(err) + } + + if sr.producer.options.Interceptors != nil && err == nil { + sr.producer.options.Interceptors.OnSendAcknowledgement(sr.producer, sr.msg, msgID) + } + } +} + type closeProducer struct { doneCh chan struct{} }