This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 7c14fb7a fix: normalize all send request resource release into sr.done (#1121) 7c14fb7a is described below commit 7c14fb7a8e7c783db9d5738c3b2aa65d56f2db82 Author: tison <wander4...@gmail.com> AuthorDate: Wed Oct 25 19:44:33 2023 +0800 fix: normalize all send request resource release into sr.done (#1121) Signed-off-by: tison <wander4...@gmail.com> Co-authored-by: gunli <gu...@tencent.com> --- pulsar/message_chunking_test.go | 15 +- pulsar/producer_partition.go | 419 +++++++++++++++++++++------------------- pulsar/producer_test.go | 6 +- pulsar/transaction_test.go | 1 - 4 files changed, 228 insertions(+), 213 deletions(-) diff --git a/pulsar/message_chunking_test.go b/pulsar/message_chunking_test.go index fbdcaa0c..59fdb5ec 100644 --- a/pulsar/message_chunking_test.go +++ b/pulsar/message_chunking_test.go @@ -24,6 +24,7 @@ import ( "math/rand" "net/http" "strings" + "sync" "testing" "time" @@ -531,12 +532,13 @@ func TestChunkBlockIfQueueFull(t *testing.T) { assert.NotNil(t, producer) defer producer.Close() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() // Large messages will be split into 11 chunks, exceeding the length of pending queue - ID, err := producer.Send(context.Background(), &ProducerMessage{ + _, err = producer.Send(ctx, &ProducerMessage{ Payload: createTestMessagePayload(100), }) - assert.NoError(t, err) - assert.NotNil(t, ID) + assert.Error(t, err) } func createTestMessagePayload(size int) []byte { @@ -566,18 +568,15 @@ func sendSingleChunk(p Producer, uuid string, chunkID int, totalChunks int) { &sendRequest{ callback: func(id MessageID, producerMessage *ProducerMessage, err error) { }, + callbackOnce: &sync.Once{}, ctx: context.Background(), msg: msg, + producer: producerImpl, flushImmediately: true, totalChunks: totalChunks, chunkID: chunkID, uuid: uuid, chunkRecorder: newChunkRecorder(), - transaction: nil, - reservedMem: 0, - sendAsBatch: false, - schema: nil, - schemaVersion: nil, uncompressedPayload: wholePayload, uncompressedSize: int64(len(wholePayload)), compressedPayload: wholePayload, diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index b00ed6b5..f606fe05 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -251,7 +251,7 @@ func (p *partitionProducer) grabCnx() error { res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer) if err != nil { p.log.WithError(err).Error("Failed to create producer at send PRODUCER request") - if err == internal.ErrRequestTimeOut { + if errors.Is(err, internal.ErrRequestTimeOut) { id := p.client.rpcClient.NewRequestID() _, _ = p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_CLOSE_PRODUCER, &pb.CommandCloseProducer{ @@ -481,45 +481,28 @@ func runCallback(cb func(MessageID, *ProducerMessage, error), id MessageID, msg func (p *partitionProducer) internalSend(sr *sendRequest) { p.log.Debug("Received send request: ", *sr.msg) - msg := sr.msg - - if !p.canAddToQueue(sr) { - return - } - - // try to reserve memory for uncompressedPayload - if !p.canReserveMem(sr, sr.uncompressedSize) { - return - } - - if err := p.updateChunkInfo(sr); err != nil { - p.releaseSemaphoreAndMem(sr.uncompressedSize) - runCallback(sr.callback, nil, sr.msg, err) - p.metrics.PublishErrorsMsgTooLarge.Inc() - return - } - if sr.sendAsBatch { - smm := p.genSingleMessageMetadataInBatch(msg, int(sr.uncompressedSize)) + smm := p.genSingleMessageMetadataInBatch(sr.msg, int(sr.uncompressedSize)) multiSchemaEnabled := !p.options.DisableMultiSchema - added := addRequestToBatch(smm, p, sr.uncompressedPayload, sr, msg, sr.deliverAt, sr.schemaVersion, - multiSchemaEnabled) + + added := addRequestToBatch( + smm, p, sr.uncompressedPayload, sr, sr.msg, sr.deliverAt, sr.schemaVersion, multiSchemaEnabled) if !added { // The current batch is full. flush it and retry - p.internalFlushCurrentBatch() // after flushing try again to add the current payload - if ok := addRequestToBatch(smm, p, sr.uncompressedPayload, sr, msg, sr.deliverAt, sr.schemaVersion, - multiSchemaEnabled); !ok { - p.releaseSemaphoreAndMem(sr.uncompressedSize) - runCallback(sr.callback, nil, sr.msg, errFailAddToBatch) + ok := addRequestToBatch( + smm, p, sr.uncompressedPayload, sr, sr.msg, sr.deliverAt, sr.schemaVersion, multiSchemaEnabled) + if !ok { p.log.WithField("size", sr.uncompressedSize). - WithField("properties", msg.Properties). + WithField("properties", sr.msg.Properties). Error("unable to add message to batch") + sr.done(nil, errFailAddToBatch) return } } + if sr.flushImmediately { p.internalFlushCurrentBatch() } @@ -547,6 +530,7 @@ func (p *partitionProducer) internalSend(sr *sendRequest) { nsr := &sendRequest{ ctx: sr.ctx, msg: sr.msg, + producer: sr.producer, callback: sr.callback, callbackOnce: sr.callbackOnce, publishTime: sr.publishTime, @@ -556,6 +540,8 @@ func (p *partitionProducer) internalSend(sr *sendRequest) { uuid: uuid, chunkRecorder: cr, transaction: sr.transaction, + memLimit: sr.memLimit, + semaphore: sr.semaphore, reservedMem: int64(rhs - lhs), sendAsBatch: sr.sendAsBatch, schema: sr.schema, @@ -569,12 +555,8 @@ func (p *partitionProducer) internalSend(sr *sendRequest) { deliverAt: sr.deliverAt, maxMessageSize: sr.maxMessageSize, } - // the permit of first chunk has acquired - if chunkID != 0 && !p.canAddToQueue(nsr) { - p.releaseSemaphoreAndMem(sr.uncompressedSize - int64(lhs)) - return - } - p.internalSingleSend(sr.mm, sr.compressedPayload[lhs:rhs], nsr, uint32(sr.maxMessageSize)) + + p.internalSingleSend(nsr.mm, nsr.compressedPayload[lhs:rhs], nsr, uint32(nsr.maxMessageSize)) } } @@ -675,11 +657,13 @@ func (p *partitionProducer) genSingleMessageMetadataInBatch( return } -func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata, +func (p *partitionProducer) internalSingleSend( + mm *pb.MessageMetadata, compressedPayload []byte, - request *sendRequest, - maxMessageSize uint32) { - msg := request.msg + sr *sendRequest, + maxMessageSize uint32, +) { + msg := sr.msg payloadBuf := internal.NewBuffer(len(compressedPayload)) payloadBuf.Write(compressedPayload) @@ -694,8 +678,8 @@ func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata, var mostSigBits uint64 var leastSigBits uint64 - if request.transaction != nil { - txnID := request.transaction.GetTxnID() + if sr.transaction != nil { + txnID := sr.transaction.GetTxnID() useTxn = true mostSigBits = txnID.MostSigBits leastSigBits = txnID.LeastSigBits @@ -715,8 +699,7 @@ func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata, ) if err != nil { - runCallback(request.callback, nil, request.msg, err) - p.releaseSemaphoreAndMem(request.reservedMem) + sr.done(nil, err) p.log.WithError(err).Errorf("Single message serialize failed %s", msg.Value) return } @@ -725,7 +708,7 @@ func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata, sentAt: time.Now(), buffer: buffer, sequenceID: sid, - sendRequests: []interface{}{request}, + sendRequests: []interface{}{sr}, }) p._getConn().WriteData(buffer) } @@ -756,15 +739,14 @@ func (p *partitionProducer) internalFlushCurrentBatch() { if err != nil { for _, cb := range callbacks { if sr, ok := cb.(*sendRequest); ok { - runCallback(sr.callback, nil, sr.msg, err) + sr.done(nil, err) } } + if errors.Is(err, internal.ErrExceedMaxMessageSize) { - p.log.WithError(errMessageTooLarge). - Errorf("internal err: %s", err) - p.metrics.PublishErrorsMsgTooLarge.Inc() - return + p.log.WithError(errMessageTooLarge).Errorf("internal err: %s", err) } + return } @@ -853,25 +835,7 @@ func (p *partitionProducer) failTimeoutMessages() { for _, i := range pi.sendRequests { sr := i.(*sendRequest) - if sr.msg != nil { - size := len(sr.msg.Payload) - p.releaseSemaphoreAndMem(sr.reservedMem) - p.metrics.MessagesPending.Dec() - p.metrics.BytesPending.Sub(float64(size)) - p.metrics.PublishErrorsTimeout.Inc() - p.log.WithError(errSendTimeout). - WithField("size", size). - WithField("properties", sr.msg.Properties) - } - - if sr.callback != nil { - sr.callbackOnce.Do(func() { - runCallback(sr.callback, nil, sr.msg, errSendTimeout) - }) - } - if sr.transaction != nil { - sr.transaction.endSendOrAckOp(nil) - } + sr.done(nil, errSendTimeout) } // flag the sending has completed with error, flush make no effect @@ -899,15 +863,15 @@ func (p *partitionProducer) internalFlushCurrentBatches() { if errs[i] != nil { for _, cb := range callbacks[i] { if sr, ok := cb.(*sendRequest); ok { - runCallback(sr.callback, nil, sr.msg, errs[i]) + sr.done(nil, errs[i]) } } + if errors.Is(errs[i], internal.ErrExceedMaxMessageSize) { - p.log.WithError(errMessageTooLarge). - Errorf("internal err: %s", errs[i]) - p.metrics.PublishErrorsMsgTooLarge.Inc() + p.log.WithError(errMessageTooLarge).Errorf("internal err: %s", errs[i]) return } + continue } if batchesData[i] == nil { @@ -1036,12 +1000,6 @@ func (p *partitionProducer) prepareTransaction(sr *sendRequest) error { } sr.transaction = txn - callback := sr.callback - sr.callback = func(id MessageID, producerMessage *ProducerMessage, err error) { - runCallback(callback, id, producerMessage, err) - txn.endSendOrAckOp(err) - } - return nil } @@ -1188,19 +1146,21 @@ func (p *partitionProducer) internalSendAsync( sr := &sendRequest{ ctx: ctx, msg: msg, + producer: p, callback: callback, callbackOnce: &sync.Once{}, flushImmediately: flushImmediately, publishTime: time.Now(), + chunkID: -1, } + if err := p.prepareTransaction(sr); err != nil { - runCallback(sr.callback, nil, msg, err) + sr.done(nil, err) return } if p.getProducerState() != producerReady { - // Producer is closing - runCallback(sr.callback, nil, msg, errProducerClosed) + sr.done(nil, errProducerClosed) return } @@ -1208,18 +1168,30 @@ func (p *partitionProducer) internalSendAsync( if err := p.updateSchema(sr); err != nil { p.log.Error(err) - runCallback(sr.callback, nil, msg, err) + sr.done(nil, err) return } if err := p.updateUncompressedPayload(sr); err != nil { p.log.Error(err) - runCallback(sr.callback, nil, msg, err) + sr.done(nil, err) return } p.updateMetaData(sr) + if err := p.updateChunkInfo(sr); err != nil { + p.log.Error(err) + sr.done(nil, err) + return + } + + if err := p.reserveResources(sr); err != nil { + p.log.Error(err) + sr.done(nil, err) + return + } + p.dataChan <- sr } @@ -1257,55 +1229,40 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) for idx, i := range pi.sendRequests { sr := i.(*sendRequest) atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID)) - p.releaseSemaphoreAndMem(sr.reservedMem) - p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9) - p.metrics.MessagesPublished.Inc() - p.metrics.MessagesPending.Dec() - payloadSize := float64(len(sr.msg.Payload)) - p.metrics.BytesPublished.Add(payloadSize) - p.metrics.BytesPending.Sub(payloadSize) - - if sr.callback != nil || len(p.options.Interceptors) > 0 { - msgID := newMessageID( - int64(response.MessageId.GetLedgerId()), - int64(response.MessageId.GetEntryId()), - int32(idx), - p.partitionIdx, - batchSize, - ) - - if sr.totalChunks > 1 { - if sr.chunkID == 0 { - sr.chunkRecorder.setFirstChunkID( - &messageID{ - int64(response.MessageId.GetLedgerId()), - int64(response.MessageId.GetEntryId()), - -1, - p.partitionIdx, - 0, - }) - } else if sr.chunkID == sr.totalChunks-1 { - sr.chunkRecorder.setLastChunkID( - &messageID{ - int64(response.MessageId.GetLedgerId()), - int64(response.MessageId.GetEntryId()), - -1, - p.partitionIdx, - 0, - }) - // use chunkMsgID to set msgID - msgID = &sr.chunkRecorder.chunkedMsgID - } - } - if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 { - runCallback(sr.callback, msgID, sr.msg, nil) - p.options.Interceptors.OnSendAcknowledgement(p, sr.msg, msgID) + msgID := newMessageID( + int64(response.MessageId.GetLedgerId()), + int64(response.MessageId.GetEntryId()), + int32(idx), + p.partitionIdx, + batchSize, + ) + + if sr.totalChunks > 1 { + if sr.chunkID == 0 { + sr.chunkRecorder.setFirstChunkID( + &messageID{ + int64(response.MessageId.GetLedgerId()), + int64(response.MessageId.GetEntryId()), + -1, + p.partitionIdx, + 0, + }) + } else if sr.chunkID == sr.totalChunks-1 { + sr.chunkRecorder.setLastChunkID( + &messageID{ + int64(response.MessageId.GetLedgerId()), + int64(response.MessageId.GetEntryId()), + -1, + p.partitionIdx, + 0, + }) + // use chunkMsgID to set msgID + msgID = &sr.chunkRecorder.chunkedMsgID } } - if sr.transaction != nil { - sr.transaction.endSendOrAckOp(nil) - } + + sr.done(msgID, nil) } // Mark this pending item as done @@ -1372,24 +1329,7 @@ func (p *partitionProducer) failPendingMessages() { for _, i := range pi.sendRequests { sr := i.(*sendRequest) - if sr.msg != nil { - size := len(sr.msg.Payload) - p.releaseSemaphoreAndMem(sr.reservedMem) - p.metrics.MessagesPending.Dec() - p.metrics.BytesPending.Sub(float64(size)) - p.log.WithError(errProducerClosed). - WithField("size", size). - WithField("properties", sr.msg.Properties) - } - - if sr.callback != nil { - sr.callbackOnce.Do(func() { - runCallback(sr.callback, nil, sr.msg, errProducerClosed) - }) - } - if sr.transaction != nil { - sr.transaction.endSendOrAckOp(nil) - } + sr.done(nil, errProducerClosed) } // flag the sending has completed with error, flush make no effect @@ -1448,19 +1388,29 @@ func (p *partitionProducer) Close() { } type sendRequest struct { - ctx context.Context - msg *ProducerMessage - callback func(MessageID, *ProducerMessage, error) - callbackOnce *sync.Once - publishTime time.Time - flushImmediately bool - totalChunks int - chunkID int - uuid string - chunkRecorder *chunkRecorder - transaction *transaction - reservedMem int64 + ctx context.Context + msg *ProducerMessage + producer *partitionProducer + callback func(MessageID, *ProducerMessage, error) + callbackOnce *sync.Once + publishTime time.Time + flushImmediately bool + totalChunks int + chunkID int + uuid string + chunkRecorder *chunkRecorder + + /// resource management + + memLimit internal.MemoryLimitController + reservedMem int64 + semaphore internal.Semaphore + reservedSemaphore int + + /// convey settable state + sendAsBatch bool + transaction *transaction schema Schema schemaVersion []byte uncompressedPayload []byte @@ -1473,6 +1423,116 @@ type sendRequest struct { maxMessageSize int32 } +func (sr *sendRequest) done(msgID MessageID, err error) { + if err == nil { + sr.producer.metrics.PublishLatency.Observe(float64(time.Now().UnixNano()-sr.publishTime.UnixNano()) / 1.0e9) + sr.producer.metrics.MessagesPublished.Inc() + sr.producer.metrics.BytesPublished.Add(float64(sr.reservedMem)) + + if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 { + if sr.producer.options.Interceptors != nil { + sr.producer.options.Interceptors.OnSendAcknowledgement(sr.producer, sr.msg, msgID) + } + } + } + + if err != nil { + sr.producer.log.WithError(err). + WithField("size", sr.reservedMem). + WithField("properties", sr.msg.Properties) + } + + if errors.Is(err, errSendTimeout) { + sr.producer.metrics.PublishErrorsTimeout.Inc() + } + + if errors.Is(err, errMessageTooLarge) { + sr.producer.metrics.PublishErrorsMsgTooLarge.Inc() + } + + if sr.semaphore != nil { + sr.semaphore.Release() + sr.producer.metrics.MessagesPending.Dec() + } + + if sr.memLimit != nil { + sr.memLimit.ReleaseMemory(sr.reservedMem) + sr.producer.metrics.BytesPending.Sub(float64(sr.reservedMem)) + } + + // sr.chunkID == -1 means a chunked message is not yet prepared, so that we should fail it immediately + if sr.totalChunks <= 1 || sr.chunkID == -1 || sr.chunkID == sr.totalChunks-1 { + sr.callbackOnce.Do(func() { + runCallback(sr.callback, msgID, sr.msg, err) + }) + + if sr.transaction != nil { + sr.transaction.endSendOrAckOp(err) + } + } +} + +func (p *partitionProducer) reserveSemaphore(sr *sendRequest) error { + for i := 0; i < sr.totalChunks; i++ { + if p.options.DisableBlockIfQueueFull { + if !p.publishSemaphore.TryAcquire() { + return errSendQueueIsFull + } + + // update sr.semaphore and sr.reservedSemaphore here so that we can release semaphore in the case + // of that only a part of the chunks acquire succeed + sr.semaphore = p.publishSemaphore + sr.reservedSemaphore++ + p.metrics.MessagesPending.Inc() + } else { + if !p.publishSemaphore.Acquire(sr.ctx) { + return errContextExpired + } + + // update sr.semaphore and sr.reservedSemaphore here so that we can release semaphore in the case + // of that only a part of the chunks acquire succeed + sr.semaphore = p.publishSemaphore + sr.reservedSemaphore++ + p.metrics.MessagesPending.Inc() + } + } + + return nil +} + +func (p *partitionProducer) reserveMem(sr *sendRequest) error { + requiredMem := sr.uncompressedSize + if !sr.sendAsBatch { + requiredMem = int64(sr.compressedSize) + } + + if p.options.DisableBlockIfQueueFull { + if !p.client.memLimit.TryReserveMemory(requiredMem) { + return errMemoryBufferIsFull + } + + } else { + if !p.client.memLimit.ReserveMemory(sr.ctx, requiredMem) { + return errContextExpired + } + } + + sr.memLimit = p.client.memLimit + sr.reservedMem += requiredMem + p.metrics.BytesPending.Add(float64(requiredMem)) + return nil +} + +func (p *partitionProducer) reserveResources(sr *sendRequest) error { + if err := p.reserveSemaphore(sr); err != nil { + return err + } + if err := p.reserveMem(sr); err != nil { + return err + } + return nil +} + type closeProducer struct { doneCh chan struct{} } @@ -1502,53 +1562,12 @@ func (p *partitionProducer) _setConn(conn internal.Connection) { // _getConn returns internal connection field of this partition producer atomically. // Note: should only be called by this partition producer before attempting to use the connection func (p *partitionProducer) _getConn() internal.Connection { - // Invariant: The conn must be non-nil for the lifetime of the partitionProducer. + // Invariant: p.conn must be non-nil for the lifetime of the partitionProducer. // For this reason we leave this cast unchecked and panic() if the // invariant is broken return p.conn.Load().(internal.Connection) } -func (p *partitionProducer) releaseSemaphoreAndMem(size int64) { - p.publishSemaphore.Release() - p.client.memLimit.ReleaseMemory(size) -} - -func (p *partitionProducer) canAddToQueue(sr *sendRequest) bool { - if p.options.DisableBlockIfQueueFull { - if !p.publishSemaphore.TryAcquire() { - runCallback(sr.callback, nil, sr.msg, errSendQueueIsFull) - return false - } - } else { - if !p.publishSemaphore.Acquire(sr.ctx) { - runCallback(sr.callback, nil, sr.msg, errContextExpired) - return false - } - } - p.metrics.MessagesPending.Inc() - return true -} - -func (p *partitionProducer) canReserveMem(sr *sendRequest, size int64) bool { - if p.options.DisableBlockIfQueueFull { - if !p.client.memLimit.TryReserveMemory(size) { - p.publishSemaphore.Release() - runCallback(sr.callback, nil, sr.msg, errMemoryBufferIsFull) - return false - } - - } else { - if !p.client.memLimit.ReserveMemory(sr.ctx, size) { - p.publishSemaphore.Release() - runCallback(sr.callback, nil, sr.msg, errContextExpired) - return false - } - } - sr.reservedMem += size - p.metrics.BytesPending.Add(float64(size)) - return true -} - type chunkRecorder struct { chunkedMsgID chunkMessageID } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 29ffa780..49e225f3 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -2079,7 +2079,6 @@ func TestMemLimitRejectProducerMessagesWithSchema(t *testing.T) { } func TestMemLimitRejectProducerMessagesWithChunking(t *testing.T) { - c, err := NewClient(ClientOptions{ URL: serviceURL, MemoryLimitBytes: 5 * 1024, @@ -2136,12 +2135,11 @@ func TestMemLimitRejectProducerMessagesWithChunking(t *testing.T) { SendTimeout: 2 * time.Second, }) - // producer2 will reserve 2*1024 bytes and then release 1024 byte (release the second chunk) - // because it reaches MaxPendingMessages in chunking + // producer3 cannot reserve 2*1024 bytes because it reaches MaxPendingMessages in chunking _, _ = producer3.Send(context.Background(), &ProducerMessage{ Payload: make([]byte, 2*1024), }) - assert.Equal(t, int64(1024), c.(*client).memLimit.CurrentUsage()) + assert.Zero(t, c.(*client).memLimit.CurrentUsage()) } func TestMemLimitContextCancel(t *testing.T) { diff --git a/pulsar/transaction_test.go b/pulsar/transaction_test.go index 385b197e..74e8dd0c 100644 --- a/pulsar/transaction_test.go +++ b/pulsar/transaction_test.go @@ -458,7 +458,6 @@ func TestAckChunkMessage(t *testing.T) { // Create transaction and register the send operation. txn, err := client.NewTransaction(time.Hour) require.Nil(t, err) - txn.(*transaction).registerSendOrAckOp() // Create a producer with chunking enabled to send a large message that will be split into chunks. producer, err := client.CreateProducer(ProducerOptions{