dnadolny commented on code in PR #21065:
URL: https://github.com/apache/kafka/pull/21065#discussion_r2674253553


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1027,14 +1027,34 @@ BuiltInPartitioner createBuiltInPartitioner(LogContext 
logContext, String topic,
     }
 
     /**
-     * Deallocate the record batch
+     * Complete and deallocate the record batch
+     */
+    public void completeAndDeallocateBatch(ProducerBatch batch) {
+        completeBatch(batch);
+        deallocate(batch);
+    }
+
+    /**
+     * Only perform deallocation (and not removal from the incomplete set)
      */
     public void deallocate(ProducerBatch batch) {
-        incomplete.remove(batch);
         // Only deallocate the batch if it is not a split batch because split 
batch are allocated outside the
         // buffer pool.
-        if (!batch.isSplitBatch())
-            free.deallocate(batch.buffer(), batch.initialCapacity());
+        if (!batch.isSplitBatch()) {
+            if (batch.isBufferDeallocated()) {
+                log.warn("Skipping deallocating a batch that has already been 
deallocated. Batch is {}, created time is {}", batch, batch.createdMs);
+            } else {
+                batch.markBufferDeallocated();

Review Comment:
   I've added it as `IllegalStateException`, though this means that we'll leak 
memory if it happens - if we throw an exception here, we may never deallocate 
the batch. The total amount of memory leaked vs heap isn't bad, but if the 
BufferPool runs low against its configured size it slows down / blocks 
indefinitely.
   An alternative would be to do it as an `assert`, which runs during tests but 
aren't on by default as far as I know when running for real. This won't leak 
memory, but allows the potential for corruption.
   I think the latter is worse, but wanted to bring it up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to