wsry commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r665222497



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -315,6 +326,17 @@ BufferAndBacklog pollBuffer() {
                 if (buffer.readableBytes() > 0) {
                     break;
                 }
+
+                // if we have an empty finished buffer and the exclusive 
credit is 0, we just return
+                // the empty buffer so that the downstream task can release 
the allocated credit for
+                // this empty buffer, this happens in two main scenarios 
currently:
+                // 1. all data of a buffer builder has been read and after 
that the buffer builder
+                // is finished
+                // 2. in approximate recovery mode, a partial record takes a 
whole buffer builder
+                if (buffersPerChannel == 0 && bufferConsumer.isFinished()) {
+                    break;
+                }
+

Review comment:
       I guess I did not fully understand the comment, could you please explain 
a bit more? From my understand, even there are multiple buffers in the queue 
and the first is empty, if we do not send the first empty buffer out, how could 
we guarantee the downstream tasks are not allocating too many buffers if the 
exclusive credit is 0? For example, if there is two buffers, the first one is 
empty and the second one is an event, we may already announce 1 backlog to the 
downstream task and if we send the event directly, dose that mean we can not 
release the credit already allocated? Correct me if I am wrong.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to