Copilot commented on code in PR #1501:
URL: https://github.com/apache/pulsar-client-go/pull/1501#discussion_r3309813776


##########
pulsar/producer_partition.go:
##########
@@ -1700,6 +1721,8 @@ func (sr *sendRequest) done(msgID MessageID, err error) {
        if pool != nil {
                // reset all the fields
                *sr = sendRequest{}
+               // Keep the guard raised until the object is reinitialized from 
the pool.
+               sr.doneFlag.Store(true)

Review Comment:
   `done()` resets the entire struct via `*sr = sendRequest{}` after setting 
`doneFlag` at the top. That assignment temporarily clears `doneFlag` back to 
false, reopening a race window where a concurrent second `done()` call can pass 
the CAS and run on a partially-reset object. To keep idempotency under 
concurrent calls, avoid overwriting `doneFlag` during reset (e.g., clear fields 
individually or use a reset helper that preserves `doneFlag` as true throughout 
reset+Put).
   



##########
pulsar/producer_partition.go:
##########
@@ -1648,7 +1609,67 @@ type sendRequest struct {
        maxMessageSize      int32
 }
 
+func newSendRequest(
+       ctx context.Context,
+       p *partitionProducer,
+       msg *ProducerMessage,
+       callback func(MessageID, *ProducerMessage, error),
+       flushImmediately bool,
+) *sendRequest {
+       sr := sendRequestPool.Get().(*sendRequest)
+       *sr = sendRequest{
+               pool:             sendRequestPool,
+               ctx:              ctx,
+               msg:              msg,
+               producer:         p,
+               callback:         callback,
+               callbackOnce:     &sync.Once{},
+               flushImmediately: flushImmediately,
+               publishTime:      time.Now(),
+               chunkID:          -1,
+       }
+       return sr
+}
+
+func newChunkSendRequest(p *sendRequest, chunkID int, uuid string, cr 
*chunkRecorder, reservedMem int64) *sendRequest {
+       sr := sendRequestPool.Get().(*sendRequest)
+       *sr = sendRequest{
+               pool:                sendRequestPool,
+               ctx:                 p.ctx,
+               msg:                 p.msg,
+               producer:            p.producer,
+               callback:            p.callback,
+               callbackOnce:        p.callbackOnce,
+               publishTime:         p.publishTime,
+               flushImmediately:    p.flushImmediately,
+               totalChunks:         p.totalChunks,
+               chunkID:             chunkID,
+               uuid:                uuid,
+               chunkRecorder:       cr,
+               transaction:         p.transaction,
+               memLimit:            p.memLimit,
+               semaphore:           p.semaphore,
+               reservedMem:         reservedMem,
+               sendAsBatch:         p.sendAsBatch,
+               schema:              p.schema,
+               schemaVersion:       p.schemaVersion,
+               uncompressedPayload: p.uncompressedPayload,
+               uncompressedSize:    p.uncompressedSize,
+               compressedPayload:   p.compressedPayload,
+               compressedSize:      p.compressedSize,
+               payloadChunkSize:    p.payloadChunkSize,
+               mm:                  p.mm,
+               deliverAt:           p.deliverAt,
+               maxMessageSize:      p.maxMessageSize,
+       }

Review Comment:
   `newChunkSendRequest` also uses `*sr = sendRequest{...}` to initialize a 
pooled object containing `atomic.Bool`. This both violates the atomic-copying 
constraint and can clear `doneFlag` to false during initialization, 
reintroducing the stale-pointer race the guard is meant to prevent. Initialize 
fields without overwriting the atomic field and only clear the guard 
(`doneFlag.Store(false)`) as the final step once the object is fully 
initialized.
   



##########
pulsar/producer_partition.go:
##########
@@ -1648,7 +1609,67 @@ type sendRequest struct {
        maxMessageSize      int32
 }
 
+func newSendRequest(
+       ctx context.Context,
+       p *partitionProducer,
+       msg *ProducerMessage,
+       callback func(MessageID, *ProducerMessage, error),
+       flushImmediately bool,
+) *sendRequest {
+       sr := sendRequestPool.Get().(*sendRequest)
+       *sr = sendRequest{
+               pool:             sendRequestPool,
+               ctx:              ctx,
+               msg:              msg,
+               producer:         p,
+               callback:         callback,
+               callbackOnce:     &sync.Once{},
+               flushImmediately: flushImmediately,
+               publishTime:      time.Now(),
+               chunkID:          -1,
+       }
+       return sr
+}
+
+func newChunkSendRequest(p *sendRequest, chunkID int, uuid string, cr 
*chunkRecorder, reservedMem int64) *sendRequest {
+       sr := sendRequestPool.Get().(*sendRequest)
+       *sr = sendRequest{
+               pool:                sendRequestPool,
+               ctx:                 p.ctx,
+               msg:                 p.msg,
+               producer:            p.producer,
+               callback:            p.callback,
+               callbackOnce:        p.callbackOnce,
+               publishTime:         p.publishTime,
+               flushImmediately:    p.flushImmediately,
+               totalChunks:         p.totalChunks,
+               chunkID:             chunkID,
+               uuid:                uuid,
+               chunkRecorder:       cr,
+               transaction:         p.transaction,
+               memLimit:            p.memLimit,
+               semaphore:           p.semaphore,
+               reservedMem:         reservedMem,
+               sendAsBatch:         p.sendAsBatch,
+               schema:              p.schema,
+               schemaVersion:       p.schemaVersion,
+               uncompressedPayload: p.uncompressedPayload,
+               uncompressedSize:    p.uncompressedSize,
+               compressedPayload:   p.compressedPayload,
+               compressedSize:      p.compressedSize,
+               payloadChunkSize:    p.payloadChunkSize,
+               mm:                  p.mm,
+               deliverAt:           p.deliverAt,
+               maxMessageSize:      p.maxMessageSize,
+       }

Review Comment:
   `newSendRequest` reinitializes pooled objects with `*sr = sendRequest{...}`. 
Since `sendRequest` now contains an `atomic.Bool`, copying/overwriting it via 
struct assignment is unsafe (atomic types must not be copied after first use) 
and also drops the guard to false early during initialization, allowing 
stale-pointer `done()` calls to proceed if they race with pool reuse. Prefer 
preserving `doneFlag` as true while filling fields, then explicitly 
`Store(false)` as the last init step, without struct-assigning over the atomic 
field.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to