junrao commented on code in PR #21065:
URL: https://github.com/apache/kafka/pull/21065#discussion_r2677364176


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1132,7 +1157,14 @@ void abortBatches(final RuntimeException reason) {
                 dq.remove(batch);
             }
             batch.abort(reason);
-            deallocate(batch);
+            if (batch.isInflight()) {
+                // KAFKA-19012: if the batch has been sent it might still be 
in use by the network client so we cannot allow it to be reused yet.
+                // We skip deallocating it now. When the request in network 
client completes with a response, either completeBatch() or 
+                // failBatch() will be called with deallocateBatch=true. The 
buffer associated with the batch will be deallocated then.

Review Comment:
   It would be useful to make it clear that completeBatch() and failBatch() are 
in Sender.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -833,7 +851,18 @@ private void failBatch(
                     log.debug("Encountered error when transaction manager was 
handling a failed batch", e);
                 }
             }
-            maybeRemoveAndDeallocateBatch(batch);
+            if (deallocateBatch) {
+                maybeRemoveAndDeallocateBatch(batch);
+            } else {
+                // Fix for KAFKA-19012
+                // The pooled ByteBuffer associated with this batch might 
still be in use by the network client so we
+                // cannot allow it to be reused yet. We skip deallocating it 
now. When the request in the network client 
+                // completes with a response, either completeBatch() or 
failBatch() will be called with deallocateBatch=true.
+                // The buffer associated with the batch will be deallocated 
then.
+                maybeRemoveAndDeallocateBatchLater(batch);
+            }
+        } else {
+            this.accumulator.deallocate(batch);

Review Comment:
   This is not an issue for now. But it will be useful to check deallocateBatch 
before deallocate().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to