AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r492595313



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -89,17 +96,15 @@
        /** The number of available buffers that have not been announced to the 
producer yet. */
        private final AtomicInteger unannouncedCredit = new AtomicInteger(0);
 
-       /**
-        * The latest already triggered checkpoint id which would be updated 
during
-        * {@link #spillInflightBuffers(long, ChannelStateWriter)}.
-        */
-       @GuardedBy("receivedBuffers")
-       private long lastRequestedCheckpointId = -1;
+       private final BufferManager bufferManager;
 
-       /** The current received checkpoint id from the network. */
-       private long receivedCheckpointId = -1;
+       /** Stores #overtaken buffers when a checkpoint barrier is received 
before task thread started checkpoint. */
+       @GuardedBy("receivedBuffers")
+       private Map<Long, Integer> numBuffersOvertaken = new HashMap<>();
 
-       private final BufferManager bufferManager;
+       /** All started checkpoints where a barrier has not been received yet. 
*/
+       @GuardedBy("receivedBuffers")
+       private Deque<Long> pendingCheckpointBarriers = new ArrayDeque<>(2);

Review comment:
       Changed it to support only one concurrent checkpoint. I also extracted 
the whole logic into one helper class that can be used by both 
`LocalInputChannel` and `RemoteInputChannel` (with syncrhonization).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to