jolshan commented on code in PR #21065:
URL: https://github.com/apache/kafka/pull/21065#discussion_r2673283612
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1132,7 +1156,7 @@ void abortBatches(final RuntimeException reason) {
dq.remove(batch);
}
batch.abort(reason);
- deallocate(batch);
+ completeBatchAndDeallocate(batch);
Review Comment:
Sorry a bit late to the party -- but yes, we could have messages already
sent batches to my understanding.
For the fix:
> I am thinking that we can fix this in the following way. In
abortBatches(), if a batch is in-flight, we call completeBatch() and otherwise,
call completeBatchAndDeallocate(). We can add a boolean in ProducerBatch to
indicate whether it's in-flight or not. The in-flight batches are already
tracked in Sender. But adding a flag in ProducerBatch is probably more
convenient.
Are these solutions mostly to indicate correctly what we should do with the
buffers and not reuse them? I'm still trying to fully understand what the
result here would look like for a transactional producer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]