Core issue here is that there is no explicit concept of 'checkpoint' in Beam (UnboundedSource has a method 'getCheckpointMark' but that refers to the checkoint on external source). Runners do checkpoint internally as implementation detail. Flink's checkpoint model is entirely different from Dataflow's and Spark's.
@StableReplay helps, but it does not explicitly talk about a checkpoint by design. If you are looking to achieve some guarantees with a sink/DoFn, I think it is better to start with the requirements. I worked on exactly-once sink for Kafka (see KafkaIO.write().withEOS()), where we essentially reshard the elements and assign sequence numbers to elements with in each shard. Duplicates in replays are avoided based on these sequence numbers. DoFn state API is used to buffer out-of order replays. The implementation strategy works in Dataflow but not in Flink which has a horizontal checkpoint. KafkaIO checks for compatibility. On Wed, Nov 15, 2017 at 12:38 AM, Romain Manni-Bucau <[email protected]> wrote: > Hi guys, > > The subject is a bit provocative but the topic is real and coming > again and again with the beam usage: how a dofn can handle some > "chunking". > > The need is to be able to commit each N records but with N not too big. > > The natural API for that in beam is the bundle one but bundles are not > reliable since they can be very small (flink) - we can say it is "ok" > even if it has some perf impacts - or too big (spark does full size / > #workers). > > The workaround is what we see in the ES I/O: a maxSize which does an > eager flush. The issue is that then the checkpoint is not respected > and you can process multiple times the same records. > > Any plan to make this API reliable and controllable from a beam point > of view (at least in a max manner)? > > Thanks, > Romain Manni-Bucau > @rmannibucau | Blog | Old Blog | Github | LinkedIn >
