AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r492596462



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -454,42 +431,106 @@ public void onBuffer(Buffer buffer, int sequenceNumber, 
int backlog) throws IOEx
                                }
 
                                wasEmpty = receivedBuffers.isEmpty();
-                               receivedBuffers.add(buffer);
 
-                               if (listener != null && buffer.isBuffer() && 
receivedCheckpointId < lastRequestedCheckpointId) {
-                                       notifyReceivedBuffer = 
buffer.retainBuffer();
+                               AbstractEvent priorityEvent = 
parsePriorityEvent(buffer);
+                               if (priorityEvent != null) {
+                                       
receivedBuffers.addPriorityElement(buffer);
+                                       final int pos = 
receivedBuffers.getNumPriorityElements();
+                                       if (priorityEvent instanceof 
CheckpointBarrier) {
+                                               final long barrierId = 
((CheckpointBarrier) priorityEvent).getId();
+                                               // don't spill future buffers 
for this checkpoint
+                                               if 
(!pendingCheckpointBarriers.remove(barrierId)) {
+                                                       // checkpoint was not 
yet started by task thread,
+                                                       // so remember the 
numbers of buffers to spill for the time when it will be started
+                                                       
numBuffersOvertaken.put(barrierId, receivedBuffers.size() - pos);
+                                               }
+                                       }
+                                       firstPriorityEvent = pos == 1;
                                } else {
-                                       notifyReceivedBuffer = null;
+                                       receivedBuffers.add(buffer);
+                                       if (buffer.isBuffer()) {
+                                               for (final long checkpointId : 
pendingCheckpointBarriers) {
+                                                       
channelStateWriter.addInputData(
+                                                               checkpointId,
+                                                               channelInfo,
+                                                               sequenceNumber,
+                                                               
CloseableIterator.ofElement(buffer.retainBuffer(), Buffer::recycleBuffer));
+                                               }
+                                       }
                                }
-                               notifyReceivedBarrier = listener != null ? 
parseCheckpointBarrierOrNull(buffer) : null;
                        }
                        recycleBuffer = false;
 
                        ++expectedSequenceNumber;
 
+                       if (firstPriorityEvent) {
+                               notifyPriorityEvent();
+                       }
                        if (wasEmpty) {
                                notifyChannelNonEmpty();
                        }
 
                        if (backlog >= 0) {
                                onSenderBacklog(backlog);
                        }
-
-                       if (notifyReceivedBarrier != null) {
-                               receivedCheckpointId = 
notifyReceivedBarrier.getId();
-                               if (notifyReceivedBarrier.isCheckpoint()) {
-                                       
listener.notifyBarrierReceived(notifyReceivedBarrier, channelInfo);
-                               }
-                       } else if (notifyReceivedBuffer != null) {
-                               
listener.notifyBufferReceived(notifyReceivedBuffer, channelInfo);
-                       }
                } finally {
                        if (recycleBuffer) {
                                buffer.recycleBuffer();
                        }
                }
        }
 
+       /**
+        * Spills all queued buffers on checkpoint start. If barrier has 
already been received (and reordered), spill only
+        * the overtaken buffers.
+        */
+       public void checkpointStarted(CheckpointBarrier barrier) {
+               checkState(channelStateWriter != null, "Channel state writer 
not injected");
+               synchronized (receivedBuffers) {
+                       final Integer numBuffers = 
numBuffersOvertaken.get(barrier.getId());
+                       if (numBuffers != null) {
+                               // already received barrier before the task 
thread picked up the barrier of this or another channel
+                               spillBuffers(barrier.getId(), numBuffers);
+                       } else {
+                               // barrier not yet received, spill all current 
and future buffers
+                               spillBuffers(barrier.getId(), 
receivedBuffers.getNumUnprioritizedElements());
+                               pendingCheckpointBarriers.add(barrier.getId());
+                       }
+               }
+       }
+
+       public void checkpointStopped(long checkpointId) {
+               synchronized (receivedBuffers) {
+                       numBuffersOvertaken.remove(checkpointId);
+                       pendingCheckpointBarriers.remove(checkpointId);
+               }
+       }
+
+       private void spillBuffers(long checkpointId, int numBuffers) {

Review comment:
       I added a helper class `ChannelStatePersister` and used `persist` 
everywhere to have a clear separation of the two names. `Persister` is more on 
the logical level and `Writer` on the physical implementation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to