pnowojski commented on a change in pull request #12120:
URL: https://github.com/apache/flink/pull/12120#discussion_r426226419



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileBasedBufferIterator.java
##########
@@ -85,6 +90,6 @@ private int read(byte[] buffer) {
 
        @Override
        public void close() throws Exception {
-               closeAll(stream, file::release);
+               closeAll(stream, file::release, () -> 
buffersToClose.forEach(Buffer::recycleBuffer));

Review comment:
       I don't understand it how is it working right now. Where are the non 
last buffers recycled? And why do you have to handle the last buffer 
differently? As I mentioned before, I think:
   > The ownership of the buffers returned from the iterator should belong to 
the caller of next() method.
   
   so buffers' ownership should be handed over, and owner should be always 
responsible to recycle it when it's done using it. For in memory collections, 
all of the remaining buffers should be recycled via closing the iterator (as he 
would be the owner of not yet returned buffers).
   
   I guess we are avoiding memory leaks right now, only because the returned 
Buffers are not pooled and using simple `FreeingBufferRecycler`? If so, that's 
not how buffers should be used and can brake if someone else would pass buffers 
with a different recycler.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to