AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r492591478
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -173,50 +181,51 @@ private boolean addBuffer(BufferConsumer bufferConsumer) {
buffers.add(bufferConsumer);
return false;
}
- checkState(inflightBufferSnapshot.isEmpty(), "Supporting only
one concurrent checkpoint in unaligned " +
- "checkpoints");
final int pos = buffers.getNumPriorityElements();
buffers.addPriorityElement(bufferConsumer);
- boolean unalignedCheckpoint =
isUnalignedCheckpoint(bufferConsumer);
- if (unalignedCheckpoint) {
+ CheckpointBarrier barrier =
parseCheckpointBarrier(bufferConsumer);
+ if (barrier != null) {
+ checkState(
+
barrier.getCheckpointOptions().isUnalignedCheckpoint(),
+ "Only unaligned checkpoints should be priority
events");
final Iterator<BufferConsumer> iterator =
buffers.iterator();
Iterators.advance(iterator, pos + 1);
+ List<Buffer> inflightBuffers = new ArrayList<>();
while (iterator.hasNext()) {
BufferConsumer buffer = iterator.next();
if (buffer.isBuffer()) {
try (BufferConsumer bc = buffer.copy())
{
-
inflightBufferSnapshot.add(bc.build());
+ inflightBuffers.add(bc.build());
}
}
}
+ channelStateWriter.addOutputData(
+ barrier.getId(),
+ subpartitionInfo,
+ ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+ inflightBuffers.toArray(new Buffer[0]));
Review comment:
Not relevant in the final version where the channel spills by itself (no
return value on this method). I can make it clearer in the commit message if
you want.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]