This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c14fb7a fix: normalize all send request resource release into sr.done 
(#1121)
7c14fb7a is described below

commit 7c14fb7a8e7c783db9d5738c3b2aa65d56f2db82
Author: tison <wander4...@gmail.com>
AuthorDate: Wed Oct 25 19:44:33 2023 +0800

    fix: normalize all send request resource release into sr.done (#1121)
    
    Signed-off-by: tison <wander4...@gmail.com>
    Co-authored-by: gunli <gu...@tencent.com>
---
 pulsar/message_chunking_test.go |  15 +-
 pulsar/producer_partition.go    | 419 +++++++++++++++++++++-------------------
 pulsar/producer_test.go         |   6 +-
 pulsar/transaction_test.go      |   1 -
 4 files changed, 228 insertions(+), 213 deletions(-)

diff --git a/pulsar/message_chunking_test.go b/pulsar/message_chunking_test.go
index fbdcaa0c..59fdb5ec 100644
--- a/pulsar/message_chunking_test.go
+++ b/pulsar/message_chunking_test.go
@@ -24,6 +24,7 @@ import (
        "math/rand"
        "net/http"
        "strings"
+       "sync"
        "testing"
        "time"
 
@@ -531,12 +532,13 @@ func TestChunkBlockIfQueueFull(t *testing.T) {
        assert.NotNil(t, producer)
        defer producer.Close()
 
+       ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+       defer cancel()
        // Large messages will be split into 11 chunks, exceeding the length of 
pending queue
-       ID, err := producer.Send(context.Background(), &ProducerMessage{
+       _, err = producer.Send(ctx, &ProducerMessage{
                Payload: createTestMessagePayload(100),
        })
-       assert.NoError(t, err)
-       assert.NotNil(t, ID)
+       assert.Error(t, err)
 }
 
 func createTestMessagePayload(size int) []byte {
@@ -566,18 +568,15 @@ func sendSingleChunk(p Producer, uuid string, chunkID 
int, totalChunks int) {
                &sendRequest{
                        callback: func(id MessageID, producerMessage 
*ProducerMessage, err error) {
                        },
+                       callbackOnce:        &sync.Once{},
                        ctx:                 context.Background(),
                        msg:                 msg,
+                       producer:            producerImpl,
                        flushImmediately:    true,
                        totalChunks:         totalChunks,
                        chunkID:             chunkID,
                        uuid:                uuid,
                        chunkRecorder:       newChunkRecorder(),
-                       transaction:         nil,
-                       reservedMem:         0,
-                       sendAsBatch:         false,
-                       schema:              nil,
-                       schemaVersion:       nil,
                        uncompressedPayload: wholePayload,
                        uncompressedSize:    int64(len(wholePayload)),
                        compressedPayload:   wholePayload,
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index b00ed6b5..f606fe05 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -251,7 +251,7 @@ func (p *partitionProducer) grabCnx() error {
        res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, 
id, pb.BaseCommand_PRODUCER, cmdProducer)
        if err != nil {
                p.log.WithError(err).Error("Failed to create producer at send 
PRODUCER request")
-               if err == internal.ErrRequestTimeOut {
+               if errors.Is(err, internal.ErrRequestTimeOut) {
                        id := p.client.rpcClient.NewRequestID()
                        _, _ = p.client.rpcClient.Request(lr.LogicalAddr, 
lr.PhysicalAddr, id, pb.BaseCommand_CLOSE_PRODUCER,
                                &pb.CommandCloseProducer{
@@ -481,45 +481,28 @@ func runCallback(cb func(MessageID, *ProducerMessage, 
error), id MessageID, msg
 func (p *partitionProducer) internalSend(sr *sendRequest) {
        p.log.Debug("Received send request: ", *sr.msg)
 
-       msg := sr.msg
-
-       if !p.canAddToQueue(sr) {
-               return
-       }
-
-       // try to reserve memory for uncompressedPayload
-       if !p.canReserveMem(sr, sr.uncompressedSize) {
-               return
-       }
-
-       if err := p.updateChunkInfo(sr); err != nil {
-               p.releaseSemaphoreAndMem(sr.uncompressedSize)
-               runCallback(sr.callback, nil, sr.msg, err)
-               p.metrics.PublishErrorsMsgTooLarge.Inc()
-               return
-       }
-
        if sr.sendAsBatch {
-               smm := p.genSingleMessageMetadataInBatch(msg, 
int(sr.uncompressedSize))
+               smm := p.genSingleMessageMetadataInBatch(sr.msg, 
int(sr.uncompressedSize))
                multiSchemaEnabled := !p.options.DisableMultiSchema
-               added := addRequestToBatch(smm, p, sr.uncompressedPayload, sr, 
msg, sr.deliverAt, sr.schemaVersion,
-                       multiSchemaEnabled)
+
+               added := addRequestToBatch(
+                       smm, p, sr.uncompressedPayload, sr, sr.msg, 
sr.deliverAt, sr.schemaVersion, multiSchemaEnabled)
                if !added {
                        // The current batch is full. flush it and retry
-
                        p.internalFlushCurrentBatch()
 
                        // after flushing try again to add the current payload
-                       if ok := addRequestToBatch(smm, p, 
sr.uncompressedPayload, sr, msg, sr.deliverAt, sr.schemaVersion,
-                               multiSchemaEnabled); !ok {
-                               p.releaseSemaphoreAndMem(sr.uncompressedSize)
-                               runCallback(sr.callback, nil, sr.msg, 
errFailAddToBatch)
+                       ok := addRequestToBatch(
+                               smm, p, sr.uncompressedPayload, sr, sr.msg, 
sr.deliverAt, sr.schemaVersion, multiSchemaEnabled)
+                       if !ok {
                                p.log.WithField("size", sr.uncompressedSize).
-                                       WithField("properties", msg.Properties).
+                                       WithField("properties", 
sr.msg.Properties).
                                        Error("unable to add message to batch")
+                               sr.done(nil, errFailAddToBatch)
                                return
                        }
                }
+
                if sr.flushImmediately {
                        p.internalFlushCurrentBatch()
                }
@@ -547,6 +530,7 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {
                nsr := &sendRequest{
                        ctx:                 sr.ctx,
                        msg:                 sr.msg,
+                       producer:            sr.producer,
                        callback:            sr.callback,
                        callbackOnce:        sr.callbackOnce,
                        publishTime:         sr.publishTime,
@@ -556,6 +540,8 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {
                        uuid:                uuid,
                        chunkRecorder:       cr,
                        transaction:         sr.transaction,
+                       memLimit:            sr.memLimit,
+                       semaphore:           sr.semaphore,
                        reservedMem:         int64(rhs - lhs),
                        sendAsBatch:         sr.sendAsBatch,
                        schema:              sr.schema,
@@ -569,12 +555,8 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {
                        deliverAt:           sr.deliverAt,
                        maxMessageSize:      sr.maxMessageSize,
                }
-               // the permit of first chunk has acquired
-               if chunkID != 0 && !p.canAddToQueue(nsr) {
-                       p.releaseSemaphoreAndMem(sr.uncompressedSize - 
int64(lhs))
-                       return
-               }
-               p.internalSingleSend(sr.mm, sr.compressedPayload[lhs:rhs], nsr, 
uint32(sr.maxMessageSize))
+
+               p.internalSingleSend(nsr.mm, nsr.compressedPayload[lhs:rhs], 
nsr, uint32(nsr.maxMessageSize))
        }
 }
 
@@ -675,11 +657,13 @@ func (p *partitionProducer) 
genSingleMessageMetadataInBatch(
        return
 }
 
-func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata,
+func (p *partitionProducer) internalSingleSend(
+       mm *pb.MessageMetadata,
        compressedPayload []byte,
-       request *sendRequest,
-       maxMessageSize uint32) {
-       msg := request.msg
+       sr *sendRequest,
+       maxMessageSize uint32,
+) {
+       msg := sr.msg
 
        payloadBuf := internal.NewBuffer(len(compressedPayload))
        payloadBuf.Write(compressedPayload)
@@ -694,8 +678,8 @@ func (p *partitionProducer) internalSingleSend(mm 
*pb.MessageMetadata,
        var mostSigBits uint64
        var leastSigBits uint64
 
-       if request.transaction != nil {
-               txnID := request.transaction.GetTxnID()
+       if sr.transaction != nil {
+               txnID := sr.transaction.GetTxnID()
                useTxn = true
                mostSigBits = txnID.MostSigBits
                leastSigBits = txnID.LeastSigBits
@@ -715,8 +699,7 @@ func (p *partitionProducer) internalSingleSend(mm 
*pb.MessageMetadata,
        )
 
        if err != nil {
-               runCallback(request.callback, nil, request.msg, err)
-               p.releaseSemaphoreAndMem(request.reservedMem)
+               sr.done(nil, err)
                p.log.WithError(err).Errorf("Single message serialize failed 
%s", msg.Value)
                return
        }
@@ -725,7 +708,7 @@ func (p *partitionProducer) internalSingleSend(mm 
*pb.MessageMetadata,
                sentAt:       time.Now(),
                buffer:       buffer,
                sequenceID:   sid,
-               sendRequests: []interface{}{request},
+               sendRequests: []interface{}{sr},
        })
        p._getConn().WriteData(buffer)
 }
@@ -756,15 +739,14 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
        if err != nil {
                for _, cb := range callbacks {
                        if sr, ok := cb.(*sendRequest); ok {
-                               runCallback(sr.callback, nil, sr.msg, err)
+                               sr.done(nil, err)
                        }
                }
+
                if errors.Is(err, internal.ErrExceedMaxMessageSize) {
-                       p.log.WithError(errMessageTooLarge).
-                               Errorf("internal err: %s", err)
-                       p.metrics.PublishErrorsMsgTooLarge.Inc()
-                       return
+                       p.log.WithError(errMessageTooLarge).Errorf("internal 
err: %s", err)
                }
+
                return
        }
 
@@ -853,25 +835,7 @@ func (p *partitionProducer) failTimeoutMessages() {
 
                        for _, i := range pi.sendRequests {
                                sr := i.(*sendRequest)
-                               if sr.msg != nil {
-                                       size := len(sr.msg.Payload)
-                                       p.releaseSemaphoreAndMem(sr.reservedMem)
-                                       p.metrics.MessagesPending.Dec()
-                                       
p.metrics.BytesPending.Sub(float64(size))
-                                       p.metrics.PublishErrorsTimeout.Inc()
-                                       p.log.WithError(errSendTimeout).
-                                               WithField("size", size).
-                                               WithField("properties", 
sr.msg.Properties)
-                               }
-
-                               if sr.callback != nil {
-                                       sr.callbackOnce.Do(func() {
-                                               runCallback(sr.callback, nil, 
sr.msg, errSendTimeout)
-                                       })
-                               }
-                               if sr.transaction != nil {
-                                       sr.transaction.endSendOrAckOp(nil)
-                               }
+                               sr.done(nil, errSendTimeout)
                        }
 
                        // flag the sending has completed with error, flush 
make no effect
@@ -899,15 +863,15 @@ func (p *partitionProducer) internalFlushCurrentBatches() 
{
                if errs[i] != nil {
                        for _, cb := range callbacks[i] {
                                if sr, ok := cb.(*sendRequest); ok {
-                                       runCallback(sr.callback, nil, sr.msg, 
errs[i])
+                                       sr.done(nil, errs[i])
                                }
                        }
+
                        if errors.Is(errs[i], internal.ErrExceedMaxMessageSize) 
{
-                               p.log.WithError(errMessageTooLarge).
-                                       Errorf("internal err: %s", errs[i])
-                               p.metrics.PublishErrorsMsgTooLarge.Inc()
+                               
p.log.WithError(errMessageTooLarge).Errorf("internal err: %s", errs[i])
                                return
                        }
+
                        continue
                }
                if batchesData[i] == nil {
@@ -1036,12 +1000,6 @@ func (p *partitionProducer) prepareTransaction(sr 
*sendRequest) error {
        }
 
        sr.transaction = txn
-       callback := sr.callback
-       sr.callback = func(id MessageID, producerMessage *ProducerMessage, err 
error) {
-               runCallback(callback, id, producerMessage, err)
-               txn.endSendOrAckOp(err)
-       }
-
        return nil
 }
 
@@ -1188,19 +1146,21 @@ func (p *partitionProducer) internalSendAsync(
        sr := &sendRequest{
                ctx:              ctx,
                msg:              msg,
+               producer:         p,
                callback:         callback,
                callbackOnce:     &sync.Once{},
                flushImmediately: flushImmediately,
                publishTime:      time.Now(),
+               chunkID:          -1,
        }
+
        if err := p.prepareTransaction(sr); err != nil {
-               runCallback(sr.callback, nil, msg, err)
+               sr.done(nil, err)
                return
        }
 
        if p.getProducerState() != producerReady {
-               // Producer is closing
-               runCallback(sr.callback, nil, msg, errProducerClosed)
+               sr.done(nil, errProducerClosed)
                return
        }
 
@@ -1208,18 +1168,30 @@ func (p *partitionProducer) internalSendAsync(
 
        if err := p.updateSchema(sr); err != nil {
                p.log.Error(err)
-               runCallback(sr.callback, nil, msg, err)
+               sr.done(nil, err)
                return
        }
 
        if err := p.updateUncompressedPayload(sr); err != nil {
                p.log.Error(err)
-               runCallback(sr.callback, nil, msg, err)
+               sr.done(nil, err)
                return
        }
 
        p.updateMetaData(sr)
 
+       if err := p.updateChunkInfo(sr); err != nil {
+               p.log.Error(err)
+               sr.done(nil, err)
+               return
+       }
+
+       if err := p.reserveResources(sr); err != nil {
+               p.log.Error(err)
+               sr.done(nil, err)
+               return
+       }
+
        p.dataChan <- sr
 }
 
@@ -1257,55 +1229,40 @@ func (p *partitionProducer) 
ReceivedSendReceipt(response *pb.CommandSendReceipt)
                for idx, i := range pi.sendRequests {
                        sr := i.(*sendRequest)
                        atomic.StoreInt64(&p.lastSequenceID, 
int64(pi.sequenceID))
-                       p.releaseSemaphoreAndMem(sr.reservedMem)
-                       
p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
-                       p.metrics.MessagesPublished.Inc()
-                       p.metrics.MessagesPending.Dec()
-                       payloadSize := float64(len(sr.msg.Payload))
-                       p.metrics.BytesPublished.Add(payloadSize)
-                       p.metrics.BytesPending.Sub(payloadSize)
-
-                       if sr.callback != nil || len(p.options.Interceptors) > 
0 {
-                               msgID := newMessageID(
-                                       int64(response.MessageId.GetLedgerId()),
-                                       int64(response.MessageId.GetEntryId()),
-                                       int32(idx),
-                                       p.partitionIdx,
-                                       batchSize,
-                               )
-
-                               if sr.totalChunks > 1 {
-                                       if sr.chunkID == 0 {
-                                               
sr.chunkRecorder.setFirstChunkID(
-                                                       &messageID{
-                                                               
int64(response.MessageId.GetLedgerId()),
-                                                               
int64(response.MessageId.GetEntryId()),
-                                                               -1,
-                                                               p.partitionIdx,
-                                                               0,
-                                                       })
-                                       } else if sr.chunkID == 
sr.totalChunks-1 {
-                                               sr.chunkRecorder.setLastChunkID(
-                                                       &messageID{
-                                                               
int64(response.MessageId.GetLedgerId()),
-                                                               
int64(response.MessageId.GetEntryId()),
-                                                               -1,
-                                                               p.partitionIdx,
-                                                               0,
-                                                       })
-                                               // use chunkMsgID to set msgID
-                                               msgID = 
&sr.chunkRecorder.chunkedMsgID
-                                       }
-                               }
 
-                               if sr.totalChunks <= 1 || sr.chunkID == 
sr.totalChunks-1 {
-                                       runCallback(sr.callback, msgID, sr.msg, 
nil)
-                                       
p.options.Interceptors.OnSendAcknowledgement(p, sr.msg, msgID)
+                       msgID := newMessageID(
+                               int64(response.MessageId.GetLedgerId()),
+                               int64(response.MessageId.GetEntryId()),
+                               int32(idx),
+                               p.partitionIdx,
+                               batchSize,
+                       )
+
+                       if sr.totalChunks > 1 {
+                               if sr.chunkID == 0 {
+                                       sr.chunkRecorder.setFirstChunkID(
+                                               &messageID{
+                                                       
int64(response.MessageId.GetLedgerId()),
+                                                       
int64(response.MessageId.GetEntryId()),
+                                                       -1,
+                                                       p.partitionIdx,
+                                                       0,
+                                               })
+                               } else if sr.chunkID == sr.totalChunks-1 {
+                                       sr.chunkRecorder.setLastChunkID(
+                                               &messageID{
+                                                       
int64(response.MessageId.GetLedgerId()),
+                                                       
int64(response.MessageId.GetEntryId()),
+                                                       -1,
+                                                       p.partitionIdx,
+                                                       0,
+                                               })
+                                       // use chunkMsgID to set msgID
+                                       msgID = &sr.chunkRecorder.chunkedMsgID
                                }
                        }
-                       if sr.transaction != nil {
-                               sr.transaction.endSendOrAckOp(nil)
-                       }
+
+                       sr.done(msgID, nil)
                }
 
                // Mark this pending item as done
@@ -1372,24 +1329,7 @@ func (p *partitionProducer) failPendingMessages() {
 
                for _, i := range pi.sendRequests {
                        sr := i.(*sendRequest)
-                       if sr.msg != nil {
-                               size := len(sr.msg.Payload)
-                               p.releaseSemaphoreAndMem(sr.reservedMem)
-                               p.metrics.MessagesPending.Dec()
-                               p.metrics.BytesPending.Sub(float64(size))
-                               p.log.WithError(errProducerClosed).
-                                       WithField("size", size).
-                                       WithField("properties", 
sr.msg.Properties)
-                       }
-
-                       if sr.callback != nil {
-                               sr.callbackOnce.Do(func() {
-                                       runCallback(sr.callback, nil, sr.msg, 
errProducerClosed)
-                               })
-                       }
-                       if sr.transaction != nil {
-                               sr.transaction.endSendOrAckOp(nil)
-                       }
+                       sr.done(nil, errProducerClosed)
                }
 
                // flag the sending has completed with error, flush make no 
effect
@@ -1448,19 +1388,29 @@ func (p *partitionProducer) Close() {
 }
 
 type sendRequest struct {
-       ctx                 context.Context
-       msg                 *ProducerMessage
-       callback            func(MessageID, *ProducerMessage, error)
-       callbackOnce        *sync.Once
-       publishTime         time.Time
-       flushImmediately    bool
-       totalChunks         int
-       chunkID             int
-       uuid                string
-       chunkRecorder       *chunkRecorder
-       transaction         *transaction
-       reservedMem         int64
+       ctx              context.Context
+       msg              *ProducerMessage
+       producer         *partitionProducer
+       callback         func(MessageID, *ProducerMessage, error)
+       callbackOnce     *sync.Once
+       publishTime      time.Time
+       flushImmediately bool
+       totalChunks      int
+       chunkID          int
+       uuid             string
+       chunkRecorder    *chunkRecorder
+
+       /// resource management
+
+       memLimit          internal.MemoryLimitController
+       reservedMem       int64
+       semaphore         internal.Semaphore
+       reservedSemaphore int
+
+       /// convey settable state
+
        sendAsBatch         bool
+       transaction         *transaction
        schema              Schema
        schemaVersion       []byte
        uncompressedPayload []byte
@@ -1473,6 +1423,116 @@ type sendRequest struct {
        maxMessageSize      int32
 }
 
+func (sr *sendRequest) done(msgID MessageID, err error) {
+       if err == nil {
+               
sr.producer.metrics.PublishLatency.Observe(float64(time.Now().UnixNano()-sr.publishTime.UnixNano())
 / 1.0e9)
+               sr.producer.metrics.MessagesPublished.Inc()
+               sr.producer.metrics.BytesPublished.Add(float64(sr.reservedMem))
+
+               if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 {
+                       if sr.producer.options.Interceptors != nil {
+                               
sr.producer.options.Interceptors.OnSendAcknowledgement(sr.producer, sr.msg, 
msgID)
+                       }
+               }
+       }
+
+       if err != nil {
+               sr.producer.log.WithError(err).
+                       WithField("size", sr.reservedMem).
+                       WithField("properties", sr.msg.Properties)
+       }
+
+       if errors.Is(err, errSendTimeout) {
+               sr.producer.metrics.PublishErrorsTimeout.Inc()
+       }
+
+       if errors.Is(err, errMessageTooLarge) {
+               sr.producer.metrics.PublishErrorsMsgTooLarge.Inc()
+       }
+
+       if sr.semaphore != nil {
+               sr.semaphore.Release()
+               sr.producer.metrics.MessagesPending.Dec()
+       }
+
+       if sr.memLimit != nil {
+               sr.memLimit.ReleaseMemory(sr.reservedMem)
+               sr.producer.metrics.BytesPending.Sub(float64(sr.reservedMem))
+       }
+
+       // sr.chunkID == -1 means a chunked message is not yet prepared, so 
that we should fail it immediately
+       if sr.totalChunks <= 1 || sr.chunkID == -1 || sr.chunkID == 
sr.totalChunks-1 {
+               sr.callbackOnce.Do(func() {
+                       runCallback(sr.callback, msgID, sr.msg, err)
+               })
+
+               if sr.transaction != nil {
+                       sr.transaction.endSendOrAckOp(err)
+               }
+       }
+}
+
+func (p *partitionProducer) reserveSemaphore(sr *sendRequest) error {
+       for i := 0; i < sr.totalChunks; i++ {
+               if p.options.DisableBlockIfQueueFull {
+                       if !p.publishSemaphore.TryAcquire() {
+                               return errSendQueueIsFull
+                       }
+
+                       // update sr.semaphore and sr.reservedSemaphore here so 
that we can release semaphore in the case
+                       // of that only a part of the chunks acquire succeed
+                       sr.semaphore = p.publishSemaphore
+                       sr.reservedSemaphore++
+                       p.metrics.MessagesPending.Inc()
+               } else {
+                       if !p.publishSemaphore.Acquire(sr.ctx) {
+                               return errContextExpired
+                       }
+
+                       // update sr.semaphore and sr.reservedSemaphore here so 
that we can release semaphore in the case
+                       // of that only a part of the chunks acquire succeed
+                       sr.semaphore = p.publishSemaphore
+                       sr.reservedSemaphore++
+                       p.metrics.MessagesPending.Inc()
+               }
+       }
+
+       return nil
+}
+
+func (p *partitionProducer) reserveMem(sr *sendRequest) error {
+       requiredMem := sr.uncompressedSize
+       if !sr.sendAsBatch {
+               requiredMem = int64(sr.compressedSize)
+       }
+
+       if p.options.DisableBlockIfQueueFull {
+               if !p.client.memLimit.TryReserveMemory(requiredMem) {
+                       return errMemoryBufferIsFull
+               }
+
+       } else {
+               if !p.client.memLimit.ReserveMemory(sr.ctx, requiredMem) {
+                       return errContextExpired
+               }
+       }
+
+       sr.memLimit = p.client.memLimit
+       sr.reservedMem += requiredMem
+       p.metrics.BytesPending.Add(float64(requiredMem))
+       return nil
+}
+
+func (p *partitionProducer) reserveResources(sr *sendRequest) error {
+       if err := p.reserveSemaphore(sr); err != nil {
+               return err
+       }
+       if err := p.reserveMem(sr); err != nil {
+               return err
+       }
+       return nil
+}
+
 type closeProducer struct {
        doneCh chan struct{}
 }
@@ -1502,53 +1562,12 @@ func (p *partitionProducer) _setConn(conn 
internal.Connection) {
 // _getConn returns internal connection field of this partition producer 
atomically.
 // Note: should only be called by this partition producer before attempting to 
use the connection
 func (p *partitionProducer) _getConn() internal.Connection {
-       // Invariant: The conn must be non-nil for the lifetime of the 
partitionProducer.
+       // Invariant: p.conn must be non-nil for the lifetime of the 
partitionProducer.
        //            For this reason we leave this cast unchecked and panic() 
if the
        //            invariant is broken
        return p.conn.Load().(internal.Connection)
 }
 
-func (p *partitionProducer) releaseSemaphoreAndMem(size int64) {
-       p.publishSemaphore.Release()
-       p.client.memLimit.ReleaseMemory(size)
-}
-
-func (p *partitionProducer) canAddToQueue(sr *sendRequest) bool {
-       if p.options.DisableBlockIfQueueFull {
-               if !p.publishSemaphore.TryAcquire() {
-                       runCallback(sr.callback, nil, sr.msg, 
errSendQueueIsFull)
-                       return false
-               }
-       } else {
-               if !p.publishSemaphore.Acquire(sr.ctx) {
-                       runCallback(sr.callback, nil, sr.msg, errContextExpired)
-                       return false
-               }
-       }
-       p.metrics.MessagesPending.Inc()
-       return true
-}
-
-func (p *partitionProducer) canReserveMem(sr *sendRequest, size int64) bool {
-       if p.options.DisableBlockIfQueueFull {
-               if !p.client.memLimit.TryReserveMemory(size) {
-                       p.publishSemaphore.Release()
-                       runCallback(sr.callback, nil, sr.msg, 
errMemoryBufferIsFull)
-                       return false
-               }
-
-       } else {
-               if !p.client.memLimit.ReserveMemory(sr.ctx, size) {
-                       p.publishSemaphore.Release()
-                       runCallback(sr.callback, nil, sr.msg, errContextExpired)
-                       return false
-               }
-       }
-       sr.reservedMem += size
-       p.metrics.BytesPending.Add(float64(size))
-       return true
-}
-
 type chunkRecorder struct {
        chunkedMsgID chunkMessageID
 }
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 29ffa780..49e225f3 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -2079,7 +2079,6 @@ func TestMemLimitRejectProducerMessagesWithSchema(t 
*testing.T) {
 }
 
 func TestMemLimitRejectProducerMessagesWithChunking(t *testing.T) {
-
        c, err := NewClient(ClientOptions{
                URL:              serviceURL,
                MemoryLimitBytes: 5 * 1024,
@@ -2136,12 +2135,11 @@ func TestMemLimitRejectProducerMessagesWithChunking(t 
*testing.T) {
                SendTimeout:             2 * time.Second,
        })
 
-       // producer2 will reserve 2*1024 bytes and then release 1024 byte 
(release the second chunk)
-       // because it reaches MaxPendingMessages in chunking
+       // producer3 cannot reserve 2*1024 bytes because it reaches 
MaxPendingMessages in chunking
        _, _ = producer3.Send(context.Background(), &ProducerMessage{
                Payload: make([]byte, 2*1024),
        })
-       assert.Equal(t, int64(1024), c.(*client).memLimit.CurrentUsage())
+       assert.Zero(t, c.(*client).memLimit.CurrentUsage())
 }
 
 func TestMemLimitContextCancel(t *testing.T) {
diff --git a/pulsar/transaction_test.go b/pulsar/transaction_test.go
index 385b197e..74e8dd0c 100644
--- a/pulsar/transaction_test.go
+++ b/pulsar/transaction_test.go
@@ -458,7 +458,6 @@ func TestAckChunkMessage(t *testing.T) {
        // Create transaction and register the send operation.
        txn, err := client.NewTransaction(time.Hour)
        require.Nil(t, err)
-       txn.(*transaction).registerSendOrAckOp()
 
        // Create a producer with chunking enabled to send a large message that 
will be split into chunks.
        producer, err := client.CreateProducer(ProducerOptions{

Reply via email to