pnowojski commented on a change in pull request #15885:
URL: https://github.com/apache/flink/pull/15885#discussion_r633394247



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
##########
@@ -120,38 +120,36 @@ public void checkpointStopped(long checkpointId) {
     }
 
     public void onRecoveredStateBuffer(Buffer buffer) {
-        boolean recycleBuffer = true;
         NetworkActionsLogger.traceRecover(
                 "InputChannelRecoveredStateHandler#recover",
                 buffer,
                 inputGate.getOwningTaskName(),
                 channelInfo);
-        try {
-            final boolean wasEmpty;
-            synchronized (receivedBuffers) {
-                // Similar to notifyBufferAvailable(), make sure that we never 
add a buffer
-                // after releaseAllResources() released all buffers from 
receivedBuffers.
-                if (isReleased) {
-                    wasEmpty = false;
-                } else {
-                    wasEmpty = receivedBuffers.isEmpty();
-                    receivedBuffers.add(buffer);
-                    recycleBuffer = false;
-                }
-            }
 
-            if (wasEmpty) {
-                notifyChannelNonEmpty();
-            }
-        } finally {
-            if (recycleBuffer) {
-                buffer.recycleBuffer();
+        final boolean wasEmpty;
+        synchronized (receivedBuffers) {
+            // Similar to notifyBufferAvailable(), make sure that we never add 
a buffer
+            // after releaseAllResources() released all buffers from 
receivedBuffers.
+            if (isReleased) {
+                wasEmpty = false;
+            } else {
+                wasEmpty = receivedBuffers.isEmpty();
+                receivedBuffers.add(buffer.retainBuffer());

Review comment:
       This makes this method inconsistent with `RemoteInputChannel#onBuffer`, 
which makes things a bit more confusing. Also the old way seems more 
natural/explicit to me, that `onBuffer()` call transfers the ownership of the 
buffer to the `Remote/RecoveredInputChannel` and if a caller want's to re-use 
this buffer else where, it should be the one doing the retaining. 
   
   Either way, I think this method should document in the java doc the contract 
whether the passed `buffer` argument's ownership is taken by the this instance 
or not.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
##########
@@ -153,7 +153,12 @@ public BufferRecycler getRecycler() {
     }
 
     public void recycle() {
-        recycler.recycle(memorySegment);
+        // If at least one consumer was created then they responsible for the 
memory recycling
+        // because BufferBuilder doesn't contain a references counter so it 
will be impossible to
+        // correctly recycle memory here.
+        if (!bufferConsumerCreated) {
+            recycler.recycle(memorySegment);
+        }

Review comment:
       Frankly, this seems (still?) like a partial solution/hack, because 
what's the contract when this method should be called? Before this `recycle()` 
method was introduced, it was at least clear, that `BufferBuilder` is never 
recycling the segment, and this is always done via closing `BufferConsumer`s. 
Now it seems like we are going deeper into the murky waters that "sometimes" 
`recycle()` should be called?
   
   I think I would like the "hack" of relaying on the `bufferConsumerCreated` 
flag to avoid retaining the buffer in the `BufferBuilder`, but I think it's 
still confusing:
   1. It would be probably better to make `BufferBuilder` implement `Closeable` 
and rename `recycle()` to `close()` - this would probably cause quite a bit of 
changes, especially in tests.
   2. It still doesn't solve the problem of writing to already released 
`memorySegment`, like what if the `BufferConsumer` was created and has already 
been closed, while someone is still writing data to the `BufferBuilder`? There 
was a bug like that, that I fixed on a different layer, but maybe we should fix 
this after all here as well? I mean especially measuring on the benchmarking 
machine if there is any overhead of retaining and recycling `NetworkBuffer` one 
extra time?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to