AHeise commented on a change in pull request #14052:
URL: https://github.com/apache/flink/pull/14052#discussion_r525865289



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -506,43 +514,75 @@ public void checkpointStarted(CheckpointBarrier barrier) {
                synchronized (receivedBuffers) {
                        channelStatePersister.startPersisting(
                                barrier.getId(),
-                               getInflightBuffers(numBuffersOvertaken == ALL ? 
receivedBuffers.getNumUnprioritizedElements() : numBuffersOvertaken));
+                               getInflightBuffers());
                }
        }
 
        public void checkpointStopped(long checkpointId) {
                synchronized (receivedBuffers) {
                        channelStatePersister.stopPersisting(checkpointId);
-                       numBuffersOvertaken = ALL;
+                       lastOvertakenSequenceNumber = null;
+               }
+       }
+
+       @VisibleForTesting
+       List<Buffer> getInflightBuffers() {
+               synchronized (receivedBuffers) {
+                       return getInflightBuffersUnsafe();
                }
        }
 
        /**
         * Returns a list of buffers, checking the first n non-priority 
buffers, and skipping all events.
         */
-       private List<Buffer> getInflightBuffers(int numBuffers) {
+       private List<Buffer> getInflightBuffersUnsafe() {
                assert Thread.holdsLock(receivedBuffers);
 
-               if (numBuffers == 0) {
-                       return Collections.emptyList();
-               }
-
-               final List<Buffer> inflightBuffers = new 
ArrayList<>(numBuffers);
+               final List<Buffer> inflightBuffers = new ArrayList<>();
                Iterator<SequenceBuffer> iterator = receivedBuffers.iterator();
                // skip all priority events (only buffers are stored anyways)
                Iterators.advance(iterator, 
receivedBuffers.getNumPriorityElements());
 
-               // spill number of overtaken buffers or all of them if barrier 
has not been seen yet
-               for (int pos = 0; pos < numBuffers; pos++) {
-                       Buffer buffer = iterator.next().buffer;
-                       if (buffer.isBuffer()) {
-                               inflightBuffers.add(buffer.retainBuffer());
+               while (iterator.hasNext()) {
+                       SequenceBuffer sequenceBuffer = iterator.next();
+                       if (sequenceBuffer.buffer.isBuffer() && 
shouldBeSpilled(sequenceBuffer.sequenceNumber)) {
+                               
inflightBuffers.add(sequenceBuffer.buffer.retainBuffer());
                        }
                }
 
+               lastOvertakenSequenceNumber = null;
+
                return inflightBuffers;
        }
 
+       /**
+        * @return if given {@param sequenceNumber} should be spilled given 
{@link #lastOvertakenSequenceNumber}.
+        * We might not have yet received {@link CheckpointBarrier} and we 
might need to spill everything.
+        * If we have already received it, there is a bit nasty corner case of 
{@link SequenceBuffer#sequenceNumber}
+        * overflowing that needs to be handled as well.
+        */
+       private boolean shouldBeSpilled(int sequenceNumber) {
+               if (lastOvertakenSequenceNumber == null) {
+                       return true;
+               }
+               checkState(
+                       receivedBuffers.size() < Integer.MAX_VALUE / 2,
+                       "Too many buffers for sequenceNumber overflow detection 
code to work correctly");
+
+               boolean possibleOverflowAfterOvertaking = Integer.MAX_VALUE / 2 
< lastOvertakenSequenceNumber;
+               boolean possibleOverflowBeforeOvertaking = 
lastOvertakenSequenceNumber < -Integer.MAX_VALUE / 2;
+
+               if (possibleOverflowAfterOvertaking) {
+                       return sequenceNumber < lastOvertakenSequenceNumber && 
sequenceNumber > 0;
+               }
+               else if (possibleOverflowBeforeOvertaking) {
+                       return sequenceNumber < lastOvertakenSequenceNumber || 
sequenceNumber > 0;
+               }
+               else {
+                       return sequenceNumber < lastOvertakenSequenceNumber;
+               }

Review comment:
       If this is the only place, we can leave as is. We just need to be aware 
of the code in other places where we compare sequence numbers (other than the 
strict increment of `RemoteInputChannel`) and pull out on demand.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to