AHeise commented on a change in pull request #14052:
URL: https://github.com/apache/flink/pull/14052#discussion_r525864426



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -506,43 +514,75 @@ public void checkpointStarted(CheckpointBarrier barrier) {
                synchronized (receivedBuffers) {
                        channelStatePersister.startPersisting(
                                barrier.getId(),
-                               getInflightBuffers(numBuffersOvertaken == ALL ? 
receivedBuffers.getNumUnprioritizedElements() : numBuffersOvertaken));
+                               getInflightBuffers());
                }
        }
 
        public void checkpointStopped(long checkpointId) {
                synchronized (receivedBuffers) {
                        channelStatePersister.stopPersisting(checkpointId);
-                       numBuffersOvertaken = ALL;
+                       lastOvertakenSequenceNumber = null;
+               }
+       }
+
+       @VisibleForTesting
+       List<Buffer> getInflightBuffers() {
+               synchronized (receivedBuffers) {
+                       return getInflightBuffersUnsafe();
                }
        }
 
        /**
         * Returns a list of buffers, checking the first n non-priority 
buffers, and skipping all events.
         */
-       private List<Buffer> getInflightBuffers(int numBuffers) {
+       private List<Buffer> getInflightBuffersUnsafe() {
                assert Thread.holdsLock(receivedBuffers);
 
-               if (numBuffers == 0) {
-                       return Collections.emptyList();
-               }
-
-               final List<Buffer> inflightBuffers = new 
ArrayList<>(numBuffers);
+               final List<Buffer> inflightBuffers = new ArrayList<>();
                Iterator<SequenceBuffer> iterator = receivedBuffers.iterator();
                // skip all priority events (only buffers are stored anyways)
                Iterators.advance(iterator, 
receivedBuffers.getNumPriorityElements());
 
-               // spill number of overtaken buffers or all of them if barrier 
has not been seen yet
-               for (int pos = 0; pos < numBuffers; pos++) {
-                       Buffer buffer = iterator.next().buffer;
-                       if (buffer.isBuffer()) {
-                               inflightBuffers.add(buffer.retainBuffer());
+               while (iterator.hasNext()) {
+                       SequenceBuffer sequenceBuffer = iterator.next();
+                       if (sequenceBuffer.buffer.isBuffer() && 
shouldBeSpilled(sequenceBuffer.sequenceNumber)) {

Review comment:
       Is there a scenario where `shouldBeSpilled` returns [`true`, `false`, 
`true`] for any given buffer sequence? I don't see it happening even with 
overflow.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to