wsry commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r665829387



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -315,6 +326,17 @@ BufferAndBacklog pollBuffer() {
                 if (buffer.readableBytes() > 0) {
                     break;
                 }
+
+                // if we have an empty finished buffer and the exclusive 
credit is 0, we just return
+                // the empty buffer so that the downstream task can release 
the allocated credit for
+                // this empty buffer, this happens in two main scenarios 
currently:
+                // 1. all data of a buffer builder has been read and after 
that the buffer builder
+                // is finished
+                // 2. in approximate recovery mode, a partial record takes a 
whole buffer builder
+                if (buffersPerChannel == 0 && bufferConsumer.isFinished()) {
+                    break;
+                }
+

Review comment:
       Let's maybe focus on the 3rd case first and we assume that the exclusive 
credit is 0.
   
   1. There are only one data buffer in the queue.
   2. Flush triggered.
   3. All data of the first buffer is committed but the buffer is still not 
finished.
   4. All data of the buffer is consumed by pollBuffer and the available credit 
becomes 0.
   5. The first buffer is finished, the second event is added and the data 
available notification is triggered.
   6. The upstream announces backlog to the downstream to request a credit.
   7.  The upstream receives available credit and start to pollBuffer.
   8. Skip the first empty buffer and send the second event.
   9. The downstream receive the event but the event does not consume any 
credit.
   
   Do you mean we should change the current logic and release the floating 
buffer for event in some cases (including reduce the available credit by 1 at 
the upstream, currently the available credit is not decreased for event)? If 
there are multiple empty buffers, should we just skip the first one or should 
we skip all?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to