AHeise commented on a change in pull request #14052: URL: https://github.com/apache/flink/pull/14052#discussion_r525864426
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ########## @@ -506,43 +514,75 @@ public void checkpointStarted(CheckpointBarrier barrier) { synchronized (receivedBuffers) { channelStatePersister.startPersisting( barrier.getId(), - getInflightBuffers(numBuffersOvertaken == ALL ? receivedBuffers.getNumUnprioritizedElements() : numBuffersOvertaken)); + getInflightBuffers()); } } public void checkpointStopped(long checkpointId) { synchronized (receivedBuffers) { channelStatePersister.stopPersisting(checkpointId); - numBuffersOvertaken = ALL; + lastOvertakenSequenceNumber = null; + } + } + + @VisibleForTesting + List<Buffer> getInflightBuffers() { + synchronized (receivedBuffers) { + return getInflightBuffersUnsafe(); } } /** * Returns a list of buffers, checking the first n non-priority buffers, and skipping all events. */ - private List<Buffer> getInflightBuffers(int numBuffers) { + private List<Buffer> getInflightBuffersUnsafe() { assert Thread.holdsLock(receivedBuffers); - if (numBuffers == 0) { - return Collections.emptyList(); - } - - final List<Buffer> inflightBuffers = new ArrayList<>(numBuffers); + final List<Buffer> inflightBuffers = new ArrayList<>(); Iterator<SequenceBuffer> iterator = receivedBuffers.iterator(); // skip all priority events (only buffers are stored anyways) Iterators.advance(iterator, receivedBuffers.getNumPriorityElements()); - // spill number of overtaken buffers or all of them if barrier has not been seen yet - for (int pos = 0; pos < numBuffers; pos++) { - Buffer buffer = iterator.next().buffer; - if (buffer.isBuffer()) { - inflightBuffers.add(buffer.retainBuffer()); + while (iterator.hasNext()) { + SequenceBuffer sequenceBuffer = iterator.next(); + if (sequenceBuffer.buffer.isBuffer() && shouldBeSpilled(sequenceBuffer.sequenceNumber)) { Review comment: Is there a scenario where `shouldBeSpilled` returns [`true`, `false`, `true`] for any given buffer sequence? I don't see it happening even with overflow. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org