AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r492593809



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
##########
@@ -210,15 +221,25 @@ public void spillInflightBuffers(long checkpointId, 
ChannelStateWriter channelSt
                }
 
                Buffer buffer = next.buffer();
-               CheckpointBarrier notifyReceivedBarrier = 
parseCheckpointBarrierOrNull(buffer);
-               if (notifyReceivedBarrier != null) {
-                       receivedCheckpointId = notifyReceivedBarrier.getId();
-               } else if (receivedCheckpointId < lastRequestedCheckpointId && 
buffer.isBuffer()) {
-                       
inputGate.getBufferReceivedListener().notifyBufferReceived(buffer.retainBuffer(),
 channelInfo);
-               }
 
                numBytesIn.inc(buffer.getSize());
                numBuffersIn.inc();
+               if (buffer.isBuffer()) {
+                       for (final long barrierId : pendingCheckpointBarriers) {
+                               channelStateWriter.addInputData(
+                                       barrierId,
+                                       getChannelInfo(),
+                                       
ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+                                       
CloseableIterator.ofElement(buffer.retainBuffer(), Buffer::recycleBuffer));

Review comment:
       `LocalInputChannel` only spills when it awaits barrier. So it spills the 
buffer on first sight and it cannot be better on downsteam level. We could of 
course also move spilling lingering buffers to the upstream. It might also be 
an improvement for later.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to