jolshan commented on code in PR #21065:
URL: https://github.com/apache/kafka/pull/21065#discussion_r2673283612


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1132,7 +1156,7 @@ void abortBatches(final RuntimeException reason) {
                 dq.remove(batch);
             }
             batch.abort(reason);
-            deallocate(batch);
+            completeBatchAndDeallocate(batch);

Review Comment:
   Sorry a bit late to the party -- but yes, we could have messages already 
sent batches to my understanding. This is why it is important to abort.
   
   For the fix: 
   > I am thinking that we can fix this in the following way. In 
abortBatches(), if a batch is in-flight, we call completeBatch() and otherwise, 
call completeBatchAndDeallocate(). We can add a boolean in ProducerBatch to 
indicate whether it's in-flight or not. The in-flight batches are already 
tracked in Sender. But adding a flag in ProducerBatch is probably more 
convenient.
   
   Are these solutions mostly to indicate correctly what we should do with the 
buffers and not reuse them? I'm still trying to fully understand what the 
result here would look like for a transactional producer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to