AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r492596462
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -454,42 +431,106 @@ public void onBuffer(Buffer buffer, int sequenceNumber,
int backlog) throws IOEx
}
wasEmpty = receivedBuffers.isEmpty();
- receivedBuffers.add(buffer);
- if (listener != null && buffer.isBuffer() &&
receivedCheckpointId < lastRequestedCheckpointId) {
- notifyReceivedBuffer =
buffer.retainBuffer();
+ AbstractEvent priorityEvent =
parsePriorityEvent(buffer);
+ if (priorityEvent != null) {
+
receivedBuffers.addPriorityElement(buffer);
+ final int pos =
receivedBuffers.getNumPriorityElements();
+ if (priorityEvent instanceof
CheckpointBarrier) {
+ final long barrierId =
((CheckpointBarrier) priorityEvent).getId();
+ // don't spill future buffers
for this checkpoint
+ if
(!pendingCheckpointBarriers.remove(barrierId)) {
+ // checkpoint was not
yet started by task thread,
+ // so remember the
numbers of buffers to spill for the time when it will be started
+
numBuffersOvertaken.put(barrierId, receivedBuffers.size() - pos);
+ }
+ }
+ firstPriorityEvent = pos == 1;
} else {
- notifyReceivedBuffer = null;
+ receivedBuffers.add(buffer);
+ if (buffer.isBuffer()) {
+ for (final long checkpointId :
pendingCheckpointBarriers) {
+
channelStateWriter.addInputData(
+ checkpointId,
+ channelInfo,
+ sequenceNumber,
+
CloseableIterator.ofElement(buffer.retainBuffer(), Buffer::recycleBuffer));
+ }
+ }
}
- notifyReceivedBarrier = listener != null ?
parseCheckpointBarrierOrNull(buffer) : null;
}
recycleBuffer = false;
++expectedSequenceNumber;
+ if (firstPriorityEvent) {
+ notifyPriorityEvent();
+ }
if (wasEmpty) {
notifyChannelNonEmpty();
}
if (backlog >= 0) {
onSenderBacklog(backlog);
}
-
- if (notifyReceivedBarrier != null) {
- receivedCheckpointId =
notifyReceivedBarrier.getId();
- if (notifyReceivedBarrier.isCheckpoint()) {
-
listener.notifyBarrierReceived(notifyReceivedBarrier, channelInfo);
- }
- } else if (notifyReceivedBuffer != null) {
-
listener.notifyBufferReceived(notifyReceivedBuffer, channelInfo);
- }
} finally {
if (recycleBuffer) {
buffer.recycleBuffer();
}
}
}
+ /**
+ * Spills all queued buffers on checkpoint start. If barrier has
already been received (and reordered), spill only
+ * the overtaken buffers.
+ */
+ public void checkpointStarted(CheckpointBarrier barrier) {
+ checkState(channelStateWriter != null, "Channel state writer
not injected");
+ synchronized (receivedBuffers) {
+ final Integer numBuffers =
numBuffersOvertaken.get(barrier.getId());
+ if (numBuffers != null) {
+ // already received barrier before the task
thread picked up the barrier of this or another channel
+ spillBuffers(barrier.getId(), numBuffers);
+ } else {
+ // barrier not yet received, spill all current
and future buffers
+ spillBuffers(barrier.getId(),
receivedBuffers.getNumUnprioritizedElements());
+ pendingCheckpointBarriers.add(barrier.getId());
+ }
+ }
+ }
+
+ public void checkpointStopped(long checkpointId) {
+ synchronized (receivedBuffers) {
+ numBuffersOvertaken.remove(checkpointId);
+ pendingCheckpointBarriers.remove(checkpointId);
+ }
+ }
+
+ private void spillBuffers(long checkpointId, int numBuffers) {
Review comment:
I added a helper class `ChannelStatePersister` and used `persist`
everywhere to have a clear separation of the two names. `Persister` is more on
the logical level and `Writer` on the physical implementation.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]