junrao commented on code in PR #21065:
URL: https://github.com/apache/kafka/pull/21065#discussion_r2677364176
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1132,7 +1157,14 @@ void abortBatches(final RuntimeException reason) {
dq.remove(batch);
}
batch.abort(reason);
- deallocate(batch);
+ if (batch.isInflight()) {
+ // KAFKA-19012: if the batch has been sent it might still be
in use by the network client so we cannot allow it to be reused yet.
+ // We skip deallocating it now. When the request in network
client completes with a response, either completeBatch() or
+ // failBatch() will be called with deallocateBatch=true. The
buffer associated with the batch will be deallocated then.
Review Comment:
It would be useful to make it clear that completeBatch() and failBatch() are
in Sender.
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -833,7 +851,18 @@ private void failBatch(
log.debug("Encountered error when transaction manager was
handling a failed batch", e);
}
}
- maybeRemoveAndDeallocateBatch(batch);
+ if (deallocateBatch) {
+ maybeRemoveAndDeallocateBatch(batch);
+ } else {
+ // Fix for KAFKA-19012
+ // The pooled ByteBuffer associated with this batch might
still be in use by the network client so we
+ // cannot allow it to be reused yet. We skip deallocating it
now. When the request in the network client
+ // completes with a response, either completeBatch() or
failBatch() will be called with deallocateBatch=true.
+ // The buffer associated with the batch will be deallocated
then.
+ maybeRemoveAndDeallocateBatchLater(batch);
+ }
+ } else {
+ this.accumulator.deallocate(batch);
Review Comment:
This is not an issue for now. But it will be useful to check deallocateBatch
before deallocate().
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]