Hi all!

This is an interesting discussion for sure.

Concerning user requests for changes modes, I also hear the following quite
often:
  - reduce the expensiveness of checkpoint alignment (unaligned
checkpoints) to make checkpoints fast/stable under high backpressure
  - more fine-grained failover while maintaining exactly-once (even if
costly)

Having also "at most once" to the mix is quite a long list of big changes
to the system.

My feeling is that on such a core system, the community can not push all
these efforts at the same time, especially because they touch overlapping
areas of the system and need the same committers involved.

On the other hand, the pluggable shuffle service and pluggable scheduler
could make it possible to have an external implementation of that.
  - of a network stack that supports "reconnects" of failed tasks with
continuing tasks
  - a scheduling strategy that restarts tasks individually even in
pipelined regions

I think contributors/committers could implements this separate from the
Flink core. The feature would be trial-run it through the community
packages. If it gains a lot of traction, the community could decide to put
in the effort to merge this into the core.

Best,
Stephan


On Tue, Jun 11, 2019 at 2:10 PM SHI Xiaogang <shixiaoga...@gmail.com> wrote:

> Hi All,
>
> It definitely requires a massive effort to allow at-most-once delivery in
> Flink. But as the feature is urgently demanded by many Flink users, i think
> every effort we made is worthy. Actually, the inability to support
> at-most-once delivery has become a major obstacle for Storm users to turn
> to Flink. It's undesirable for us to run different stream processing
> systems for different scenarios.
>
> I agree with Zhu Zhu that the guarantee we provide is the very first thing
> to be discussed. Recovering with checkpoints will lead to duplicated
> records, thus break the guarantee on at-most-once delivery.
>
> A method to achieve at-most-once guarantee is to completely disable
> checkpointing and let sources only read those records posted after they
> start. The method requires sources to allow the configuration to read
> latest records, which luckily is supported by many message queues including
> Kafka. As Flink relies sources' ability to rollback to achieve exact-only
> and at-least-once delivery, i think it's acceptable for Flink to rely
> sources' ability to read latest records to achieve at-most once delivery.
> This method does not require any modification to existing checkpointing
> mechanism. Besides, as there is no need to restoring from checkpoints,
> failed tasks can recover themselves at the fastest speed.
>
> Concerning the implementation efforts, i think we can benefit from some
> ongoing work including shuffle services and fine-grained recovery. For
> example, currently the exceptions in network connections will lead to
> failures of downstream and upstream tasks. To achieve at-most-once
> delivery, we should decouple intermediate results from tasks, reporting the
> exceptions of intermediate results to job master and letting the failover
> strategy to determine the actions taken. Some work is already done in the
> efforts to achieve fine-grained recovery, which can be extended to allow
> at-most-once delivery in Flink.
>
> But before starting the discussion on implementation details, as said at
> prior, we need to determine the guarantee we provide in the scenarios where
> timely recovery is needed.
> * What do you think of the at-most-once guarantee achieved by the proposed
> method?
> * Do we need checkpointing to reduce the amount of lost data?
> * Do we need deduplication to guarantee at-most-once delivery or just
> provide best-effort delivery?
>
> Regards,
> Xiaogang Shi
>
>
> Piotr Nowojski <pi...@ververica.com> 于2019年6月11日周二 下午5:31写道:
>
> > Hi Xiaogang,
> >
> > It sounds interesting and definitely a useful feature, however the
> > questions for me would be how useful, how much effort would it require
> and
> > is it worth it? We simply can not do all things at once, and currently
> > people that could review/drive/mentor this effort are pretty much
> strained
> > :( For me one would have to investigate answers to those questions and
> > prioritise it compared to other ongoing efforts, before I could vote +1
> for
> > this.
> >
> > Couple of things to consider:
> > - would it be only a job manager/failure region recovery feature?
> > - would it require changes in CheckpointBarrierHandler,
> > CheckpointCoordinator classes?
> > - with `at-most-once` semantic theoretically speaking we could just drop
> > the current `CheckpointBarrier` handling/injecting code and avoid all of
> > the checkpoint alignment issues - we could just checkpoint all of the
> tasks
> > independently of one another. However maybe that could be a follow up
> > optimisation step?
> >
> > Piotrek
> >
> > > On 11 Jun 2019, at 10:53, Zili Chen <wander4...@gmail.com> wrote:
> > >
> > > Hi Xiaogang,
> > >
> > > It is an interesting topic.
> > >
> > > Notice that there is some effort to build a mature mllib of flink these
> > > days, it could be also possible for some ml cases trade off correctness
> > for
> > > timeliness or throughput. Excatly-once delivery excatly makes flink
> stand
> > > out but an at-most-once option would adapt flink to more scenarios.
> > >
> > > Best,
> > > tison.
> > >
> > >
> > > SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月11日周二 下午4:33写道:
> > >
> > >> Flink offers a fault-tolerance mechanism to guarantee at-least-once
> and
> > >> exactly-once message delivery in case of failures. The mechanism works
> > well
> > >> in practice and makes Flink stand out among stream processing systems.
> > >>
> > >> But the guarantee on at-least-once and exactly-once delivery does not
> > come
> > >> without price. It typically requires to restart multiple tasks and
> fall
> > >> back to the place where the last checkpoint is taken. (Fined-grained
> > >> recovery can help alleviate the cost, but it still needs certain
> > efforts to
> > >> recover jobs.)
> > >>
> > >> In some senarios, users perfer quick recovery and will trade
> correctness
> > >> off. For example, in some online recommendation systems, timeliness is
> > far
> > >> more important than consistency. In such cases, we can restart only
> > those
> > >> failed tasks individually, and do not need to perform any rollback.
> > Though
> > >> some messages delivered to failed tasks may be lost, other tasks can
> > >> continuously provide service to users.
> > >>
> > >> Many of our users are demanding for at-most-once delivery in Flink.
> > What do
> > >> you think of the proposal? Any feedback is appreciated.
> > >>
> > >> Regards,
> > >> Xiaogang Shi
> > >>
> >
> >
>

Reply via email to