This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new f9969ca8 feat: add sendRequest.done() to release resource together 
(#1110)
f9969ca8 is described below

commit f9969ca84e927786ef208abb73de065eb6bfacae
Author: gunli <24350...@qq.com>
AuthorDate: Tue Oct 24 16:42:31 2023 +0800

    feat: add sendRequest.done() to release resource together (#1110)
    
    Co-authored-by: gunli <gu...@tencent.com>
---
 pulsar/producer_partition.go | 93 +++++++++++++++++++++++++++++++++++++-------
 1 file changed, 79 insertions(+), 14 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 82123f98..2beb3614 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -1419,21 +1419,37 @@ func (p *partitionProducer) Close() {
        <-cp.doneCh
 }
 
+//nolint:all
 type sendRequest struct {
-       ctx              context.Context
-       msg              *ProducerMessage
-       callback         func(MessageID, *ProducerMessage, error)
-       callbackOnce     *sync.Once
-       publishTime      time.Time
-       flushImmediately bool
-       blockCh          chan struct{}
-       closeBlockChOnce *sync.Once
-       totalChunks      int
-       chunkID          int
-       uuid             string
-       chunkRecorder    *chunkRecorder
-       transaction      *transaction
-       reservedMem      int64
+       ctx                 context.Context
+       msg                 *ProducerMessage
+       callback            func(MessageID, *ProducerMessage, error)
+       callbackOnce        *sync.Once
+       publishTime         time.Time
+       flushImmediately    bool
+       blockCh             chan struct{}
+       closeBlockChOnce    *sync.Once
+       totalChunks         int
+       chunkID             int
+       uuid                string
+       chunkRecorder       *chunkRecorder
+       transaction         *transaction
+       reservedMem         int64
+       producer            *partitionProducer
+       memLimit            internal.MemoryLimitController
+       semaphore           internal.Semaphore
+       reservedSemaphore   int
+       sendAsBatch         bool
+       schema              Schema
+       schemaVersion       []byte
+       uncompressedPayload []byte
+       uncompressedSize    int64
+       compressedPayload   []byte
+       compressedSize      int
+       payloadChunkSize    int
+       mm                  *pb.MessageMetadata
+       deliverAt           time.Time
+       maxMessageSize      int32
 }
 
 // stopBlock can be invoked multiple times safety
@@ -1443,6 +1459,55 @@ func (sr *sendRequest) stopBlock() {
        })
 }
 
+//nolint:all
+func (sr *sendRequest) done(msgID MessageID, err error) {
+       if err == nil {
+               
sr.producer.metrics.PublishLatency.Observe(float64(time.Now().UnixNano()-sr.publishTime.UnixNano())
 / 1.0e9)
+               sr.producer.metrics.MessagesPublished.Inc()
+               sr.producer.metrics.BytesPublished.Add(float64(sr.reservedMem))
+       }
+
+       if err != nil {
+               sr.producer.log.WithError(err).
+                       WithField("size", sr.reservedMem).
+                       WithField("properties", sr.msg.Properties)
+       }
+
+       if err == errSendTimeout {
+               sr.producer.metrics.PublishErrorsTimeout.Inc()
+       }
+
+       if err == errMessageTooLarge {
+               sr.producer.metrics.PublishErrorsMsgTooLarge.Inc()
+       }
+
+       if sr.semaphore != nil {
+               for i := 0; i < sr.reservedSemaphore; i++ {
+                       sr.semaphore.Release()
+               }
+               
sr.producer.metrics.MessagesPending.Sub(float64(sr.reservedSemaphore))
+       }
+
+       if sr.memLimit != nil {
+               sr.memLimit.ReleaseMemory(sr.reservedMem)
+               sr.producer.metrics.BytesPending.Sub(float64(sr.reservedMem))
+       }
+
+       if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 {
+               sr.callbackOnce.Do(func() {
+                       runCallback(sr.callback, msgID, sr.msg, err)
+               })
+
+               if sr.transaction != nil {
+                       sr.transaction.endSendOrAckOp(err)
+               }
+
+               if sr.producer.options.Interceptors != nil && err == nil {
+                       
sr.producer.options.Interceptors.OnSendAcknowledgement(sr.producer, sr.msg, 
msgID)
+               }
+       }
+}
+
 type closeProducer struct {
        doneCh chan struct{}
 }

Reply via email to