dnadolny commented on code in PR #21065:
URL: https://github.com/apache/kafka/pull/21065#discussion_r2658098950


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java:
##########
@@ -274,6 +274,17 @@ public void deallocate(ByteBuffer buffer, int size) {
         }
     }
 
+    /**
+     * buffer is passed in only for use in tests
+     */
+    public void deallocateWithoutReuse(ByteBuffer buffer, int size) {
+        // We pass in a fresh buffer to be deallocated (added back to the pool 
if it is a poolable size)
+        // This is slightly inefficient if it's not a poolable size, but those 
tend to be small and
+        // this code is not hit often anyway
+        // This is useful if we're not 100% sure the buffer is not in use, 
which happens in some error cases
+        deallocate(ByteBuffer.allocate(size));

Review Comment:
   Not 100% sure this is what you intended but I gave it a shot. The code is 
getting a but uglier, but what I've done is:
    - in Sender we treat expired in-flight batches differently than before, all 
other batches the state and deallocation is how it was before
    - for expired in-flight batches, we do all the regular logic when it used 
to happen except for calling deallocate in our BufferPool, which we defer until 
later
    - we keep track of those expired batches that we haven't deallocated yet 
and when we hit the request timeout, we do the BufferPool free step. I didn't 
see an existing place in the code to hook in to in order to do this so I've 
done it in the new method `deallocateRequestTimeoutBatches`
   
   Note that I've kept the logic of having the BufferPool create a fresh 
ByteBuffer here, it's possible it's no longer necessary with the logic change 
above but my reasoning for keeping it is:
    - it guarantees that we've fixed the corruption issue
    - if the request timeout stuff all happens as expected then all we've done 
is be a tiny bit inefficient by creating a new ByteBuffer in a pretty rare 
case, but if there's any slight variation in timing then what would have been 
corruption is instead just using slightly more memory than we should have for a 
very short period of time



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to