AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r492595477



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -454,42 +431,106 @@ public void onBuffer(Buffer buffer, int sequenceNumber, 
int backlog) throws IOEx
                                }
 
                                wasEmpty = receivedBuffers.isEmpty();
-                               receivedBuffers.add(buffer);
 
-                               if (listener != null && buffer.isBuffer() && 
receivedCheckpointId < lastRequestedCheckpointId) {
-                                       notifyReceivedBuffer = 
buffer.retainBuffer();
+                               AbstractEvent priorityEvent = 
parsePriorityEvent(buffer);
+                               if (priorityEvent != null) {
+                                       
receivedBuffers.addPriorityElement(buffer);
+                                       final int pos = 
receivedBuffers.getNumPriorityElements();
+                                       if (priorityEvent instanceof 
CheckpointBarrier) {
+                                               final long barrierId = 
((CheckpointBarrier) priorityEvent).getId();
+                                               // don't spill future buffers 
for this checkpoint
+                                               if 
(!pendingCheckpointBarriers.remove(barrierId)) {
+                                                       // checkpoint was not 
yet started by task thread,
+                                                       // so remember the 
numbers of buffers to spill for the time when it will be started
+                                                       
numBuffersOvertaken.put(barrierId, receivedBuffers.size() - pos);
+                                               }
+                                       }
+                                       firstPriorityEvent = pos == 1;
                                } else {
-                                       notifyReceivedBuffer = null;
+                                       receivedBuffers.add(buffer);
+                                       if (buffer.isBuffer()) {
+                                               for (final long checkpointId : 
pendingCheckpointBarriers) {
+                                                       
channelStateWriter.addInputData(
+                                                               checkpointId,
+                                                               channelInfo,
+                                                               sequenceNumber,
+                                                               
CloseableIterator.ofElement(buffer.retainBuffer(), Buffer::recycleBuffer));
+                                               }
+                                       }

Review comment:
       Much smaller now thanks to the helper class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to