Interesting problem! Thanks for bringing it up Thomas. Ignore/Correct me if I am wrong but I believe Chandy-Lamport snapshots [1] would help out solve this problem more elegantly without sacrificing correctness. - They do not need alignment, only (async) logging for in-flight records between the time the first barrier is processed until the last barrier arrives in a task. - They work fine for failure recovery as long as logged records are replayed on startup.
Flink’s “alligned” savepoints would probably be still necessary for transactional sink commits + any sort of reconfiguration (e.g., rescaling, updating the logic of operators to evolve an application etc.). I don’t completely understand the “overtaking” approach but if you have a concrete definition I would be happy to check it out and help if I can! Mind that Chandy-Lamport essentially does this by logging things in pending channels in a task snapshot before the barrier arrives. -Paris [1] https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm <https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm> > On 13 Aug 2019, at 10:27, Piotr Nowojski <pi...@ververica.com> wrote: > > Hi Thomas, > > As Zhijiang has responded, we are now in the process of discussing how to > address this issue and one of the solution that we are discussing is exactly > what you are proposing: checkpoint barriers overtaking the in flight data and > make the in flight data part of the checkpoint. > > If everything works well, we will be able to present result of our > discussions on the dev mailing list soon. > > Piotrek > >> On 12 Aug 2019, at 23:23, zhijiang <wangzhijiang...@aliyun.com.INVALID> >> wrote: >> >> Hi Thomas, >> >> Thanks for proposing this concern. The barrier alignment takes long time in >> backpressure case which could cause several problems: >> 1. Checkpoint timeout as you mentioned. >> 2. The recovery cost is high once failover, because much data needs to be >> replayed. >> 3. The delay for commit-based sink is high in exactly-once. >> >> For credit-based flow control from release-1.5, the amount of in-flighting >> buffers before barrier alignment is reduced, so we could get a bit >> benefits from speeding checkpoint aspect. >> >> In release-1.8, I guess we did not suspend the channels which already >> received the barrier in practice. But actually we ever did the similar thing >> to speed barrier alighment before. I am not quite sure that release-1.8 >> covers this feature. There were some relevant discussions under jira [1]. >> >> For release-1.10, the community is now discussing the feature of unaligned >> checkpoint which is mainly for resolving above concerns. The basic idea >> is to make barrier overtakes the output/input buffer queue to speed >> alignment, and snapshot the input/output buffers as part of checkpoint >> state. The >> details have not confirmed yet and is still under discussion. Wish we could >> make some improvments for the release-1.10. >> >> [1] https://issues.apache.org/jira/browse/FLINK-8523 >> >> Best, >> Zhijiang >> ------------------------------------------------------------------ >> From:Thomas Weise <t...@apache.org> >> Send Time:2019年8月12日(星期一) 21:38 >> To:dev <dev@flink.apache.org> >> Subject:Checkpointing under backpressure >> >> Hi, >> >> One of the major operational difficulties we observe with Flink are >> checkpoint timeouts under backpressure. I'm looking for both confirmation >> of my understanding of the current behavior as well as pointers for future >> improvement work: >> >> Prior to introduction of credit based flow control in the network stack [1] >> [2], checkpoint barriers would back up with the data for all logical >> channels due to TCP backpressure. Since Flink 1.5, the buffers are >> controlled per channel, and checkpoint barriers are only held back for >> channels that have backpressure, while others can continue processing >> normally. However, checkpoint barriers still cannot "overtake data", >> therefore checkpoint alignment remains affected for the channel with >> backpressure, with the potential for slow checkpointing and timeouts. >> Albeit the delay of barriers would be capped by the maximum in-transit >> buffers per channel, resulting in an improvement compared to previous >> versions of Flink. Also, the backpressure based checkpoint alignment can >> help the barrier advance faster on the receiver side (by suspending >> channels that have already delivered the barrier). Is that accurate as of >> Flink 1.8? >> >> What appears to be missing to completely unblock checkpointing is a >> mechanism for checkpoints to overtake the data. That would help in >> situations where the processing itself is the bottleneck and prioritization >> in the network stack alone cannot address the barrier delay. Was there any >> related discussion? One possible solution would be to drain incoming data >> till the barrier and make it part of the checkpoint instead of processing >> it. This is somewhat related to asynchronous processing, but I'm thinking >> more of a solution that is automated in the Flink runtime for the >> backpressure scenario only. >> >> Thanks, >> Thomas >> >> [1] https://flink.apache.org/2019/06/05/flink-network-stack.html >> [2] >> https://docs.google.com/document/d/1chTOuOqe0sBsjldA_r-wXYeSIhU2zRGpUaTaik7QZ84/edit#heading=h.pjh6mv7m2hjn >> >