junrao commented on code in PR #21065:
URL: https://github.com/apache/kafka/pull/21065#discussion_r2670500662


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -833,7 +849,17 @@ private void failBatch(
                     log.debug("Encountered error when transaction manager was 
handling a failed batch", e);
                 }
             }
-            maybeRemoveAndDeallocateBatch(batch);
+            if (deallocateBatch) {
+                maybeRemoveAndDeallocateBatch(batch);
+            } else {
+                // Fix for KAFKA-19012
+                // The pooled ByteBuffer associated with this batch might 
still be in use by the network client so we
+                // cannot allow it to be reused yet. We skip deallocating it 
now, instead doing that in completeBatch
+                // or in another run of failBatch where we hit the else clause 
and call deallocateAlreadyRemovedIncomplete
+                maybeRemoveAndDeallocateBatchLater(batch);

Review Comment:
   > We skip deallocating it now, instead doing that in completeBatch
   >  or in another run of failBatch where we hit the else clause and call 
deallocateAlreadyRemovedIncomplete
   
   How about "We skip deallocating it now. When the request in network client 
completes with a response, either completeBatch() or failBatch() will be called 
with deallocateBatch=true. The buffer associated with the batch will be 
deallocated then."?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -72,6 +72,7 @@ private enum FinalState { ABORTED, FAILED, SUCCEEDED }
     private final AtomicInteger attempts = new AtomicInteger(0);
     private final boolean isSplitBatch;
     private final AtomicReference<FinalState> finalState = new 
AtomicReference<>(null);
+    private boolean bufferDeallocated;

Review Comment:
   Could we explicitly initialize it to false?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -581,6 +582,14 @@ public boolean sequenceHasBeenReset() {
         return reopened;
     }
 
+    public boolean isBufferDeallocated() {
+        return bufferDeallocated;
+    }
+
+    public void setBufferDeallocated(boolean bufferDeallocated) {

Review Comment:
   Could we name this method markBufferDeallocated and remove the input param?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1029,12 +1029,36 @@ BuiltInPartitioner createBuiltInPartitioner(LogContext 
logContext, String topic,
     /**
      * Deallocate the record batch
      */
-    public void deallocate(ProducerBatch batch) {
+    public void completeBatchAndDeallocate(ProducerBatch batch) {

Review Comment:
   completeBatchAndDeallocate => completeAndDeallocateBatch to match what's in 
RecordAccumulator ?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1132,7 +1156,7 @@ void abortBatches(final RuntimeException reason) {
                 dq.remove(batch);
             }
             batch.abort(reason);
-            deallocate(batch);
+            completeBatchAndDeallocate(batch);

Review Comment:
   Thanks for doing the test. abortBatches() is called in the following 2 cases.
   1. When the producer is closed (related to the failure in 
SenderTest.testForceCloseWithProducerIdReset).  This is not causing a real 
issue since the networkClient won't be sending new requests after closing.
   2. When the producer's txnManager is in fatal state (related to the failure 
in SenderTest.testCancelInFlightRequestAfterFatalError) or when handling 
authorization error. This could be a real issue.
   
   I am thinking that we can fix this in the following way. In abortBatches(), 
if a batch is in-flight, we call completeBatch() and otherwise, call 
completeBatchAndDeallocate(). We can add a boolean in ProducerBatch to indicate 
whether it's in-flight or not. The in-flight batches are already tracked in 
Sender. But adding a flag in ProducerBatch is probably more convenient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to