rkhachatryan commented on a change in pull request #14057:
URL: https://github.com/apache/flink/pull/14057#discussion_r543278831



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
##########
@@ -193,6 +202,11 @@ private CheckpointBarrierBehaviourController 
chooseController(CheckpointBarrier
 
        private boolean canTimeout(CheckpointBarrier barrier) {
                return barrier.getCheckpointOptions().isTimeoutable() &&
-                       barrier.getCheckpointOptions().getAlignmentTimeout() < 
(System.currentTimeMillis() - barrier.getTimestamp());
+                       barrier.getId() <= lastSeenBarrier &&
+                       barrier.getCheckpointOptions().getAlignmentTimeout() * 
1_000_000 < (System.nanoTime() - firstBarrierArrivalTime);
+       }
+
+       private long getArrivalTime(CheckpointBarrier announcedBarrier) {
+               return announcedBarrier.getCheckpointOptions().isTimeoutable() 
? System.nanoTime() : Long.MAX_VALUE;

Review comment:
       With a single input, this is not an alignment issue but a (related) 
back-pressure issue. And I think this is the right way to solve it: 
   > Active timeout would alleviate this problem though.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to