dnadolny commented on code in PR #21065:
URL: https://github.com/apache/kafka/pull/21065#discussion_r2674253553
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1027,14 +1027,34 @@ BuiltInPartitioner createBuiltInPartitioner(LogContext
logContext, String topic,
}
/**
- * Deallocate the record batch
+ * Complete and deallocate the record batch
+ */
+ public void completeAndDeallocateBatch(ProducerBatch batch) {
+ completeBatch(batch);
+ deallocate(batch);
+ }
+
+ /**
+ * Only perform deallocation (and not removal from the incomplete set)
*/
public void deallocate(ProducerBatch batch) {
- incomplete.remove(batch);
// Only deallocate the batch if it is not a split batch because split
batch are allocated outside the
// buffer pool.
- if (!batch.isSplitBatch())
- free.deallocate(batch.buffer(), batch.initialCapacity());
+ if (!batch.isSplitBatch()) {
+ if (batch.isBufferDeallocated()) {
+ log.warn("Skipping deallocating a batch that has already been
deallocated. Batch is {}, created time is {}", batch, batch.createdMs);
+ } else {
+ batch.markBufferDeallocated();
Review Comment:
I've added it as `IllegalStateException`, though this means that we'll leak
memory if it happens - if we throw an exception here, we may never deallocate
the batch. The total amount of memory leaked vs heap isn't bad, but if the
BufferPool runs low against its configured size it slows down / blocks
indefinitely.
An alternative would be to do it as an `assert`, which runs during tests but
aren't on by default as far as I know when running for real. This won't leak
memory, but allows the potential for corruption.
I think the latter is worse, but wanted to bring it up.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]