To avoid spending a lot of time pursuing a false path, I'd like to say straight up that SDF is definitely not going to help here, despite the fact that its API includes the term "checkpoint". In SDF, the "checkpoint" captures the state of processing within a single element. If you're applying an SDF to 1000 elements, it will, like any other DoFn, be applied to each of them independently and in parallel, and you'll have 1000 checkpoints capturing the state of processing each of these elements, which is probably not what you want.
I'm afraid I still don't understand what kind of checkpoint you need, if it is not just deterministic grouping into batches. "Checkpoint" is a very broad term and it's very possible that everybody in this thread is talking about different things when saying it. So it would help if you could give a more concrete example: for example, take some IO that you think could be easier to write with your proposed API, give the contents of a hypothetical PCollection being written to this IO, give the code of a hypothetical DoFn implementing the write using your API, and explain what you'd expect to happen at runtime. On Thu, Nov 16, 2017 at 10:33 PM Romain Manni-Bucau <[email protected]> wrote: > @Eugene: yes and the other alternative of Reuven too but it is still > 1. relying on timers, 2. not really checkpointed > > In other words it seems all solutions are to create a chunk of size 1 > and replayable to fake the lack of chunking in the framework. This > always implies a chunk handling outside the component (typically > before for an output). My point is I think IO need it in their own > "internal" or at least control it themselves since the chunk size is > part of the IO handling most of the time. > > I think JB spoke of the same "group before" trick using restrictions > which can work I have to admit if SDF are implemented by runners. Is > there a roadmap/status on that? Last time I checked SDF was a great > API without support :(. > > > > Romain Manni-Bucau > @rmannibucau | Blog | Old Blog | Github | LinkedIn > > > 2017-11-17 7:25 GMT+01:00 Eugene Kirpichov <[email protected]>: > > JB, not sure what you mean? SDFs and triggers are unrelated, and the post > > doesn't mention the word. Did you mean something else, e.g. restriction > > perhaps? Either way I don't think SDFs are the solution here; SDFs have > to > > do with the ability to split the processing of *a single element* over > > multiple calls, whereas Romain I think is asking for repeatable grouping > of > > *multiple* elements. > > > > Romain - does > > > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java > > do what > > you want? > > > > On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste Onofré <[email protected]> > > wrote: > > > >> It sounds like the "Trigger" in the Splittable DoFn, no ? > >> > >> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html > >> > >> Regards > >> JB > >> > >> > >> On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote: > >> > it gives the fn/transform the ability to save a state - it can get > >> > back on "restart" / whatever unit we can use, probably runner > >> > dependent? Without that you need to rewrite all IO usage with > >> > something like the previous pattern which makes the IO not self > >> > sufficient and kind of makes the entry cost and usage of beam way > >> > further. > >> > > >> > In my mind it is exactly what jbatch/spring-batch uses but adapted to > >> > beam (stream in particular) case. > >> > > >> > Romain Manni-Bucau > >> > @rmannibucau | Blog | Old Blog | Github | LinkedIn > >> > > >> > > >> > 2017-11-17 6:49 GMT+01:00 Reuven Lax <[email protected]>: > >> >> Romain, > >> >> > >> >> Can you define what you mean by checkpoint? What are the semantics, > what > >> >> does it accomplish? > >> >> > >> >> Reuven > >> >> > >> >> On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau < > >> [email protected]> > >> >> wrote: > >> >> > >> >>> Yes, what I propose earlier was: > >> >>> > >> >>> I. checkpoint marker: > >> >>> > >> >>> @AnyBeamAnnotation > >> >>> @CheckpointAfter > >> >>> public void someHook(SomeContext ctx); > >> >>> > >> >>> > >> >>> II. pipeline.apply(ParDo.of(new MyFn()).withCheckpointAlgorithm(new > >> >>> CountingAlgo())) > >> >>> > >> >>> III. (I like this one less) > >> >>> > >> >>> // in the dofn > >> >>> @CheckpointTester > >> >>> public boolean shouldCheckpoint(); > >> >>> > >> >>> IV. @Checkpointer Serializable getCheckpoint(); in the dofn per > element > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> Romain Manni-Bucau > >> >>> @rmannibucau | Blog | Old Blog | Github | LinkedIn > >> >>> > >> >>> > >> >>> 2017-11-17 6:06 GMT+01:00 Raghu Angadi <[email protected] > >: > >> >>>> How would you define it (rough API is fine)?. Without more details, > >> it is > >> >>>> not easy to see wider applicability and feasibility in runners. > >> >>>> > >> >>>> On Thu, Nov 16, 2017 at 1:13 PM, Romain Manni-Bucau < > >> >>> [email protected]> > >> >>>> wrote: > >> >>>> > >> >>>>> This is a fair summary of the current state but also where beam > can > >> >>> have a > >> >>>>> very strong added value and make big data great and smooth. > >> >>>>> > >> >>>>> Instead of this replay feature isnt checkpointing willable? In > >> >>> particular > >> >>>>> with SDF no? > >> >>>>> > >> >>>>> > >> >>>>> Le 16 nov. 2017 19:50, "Raghu Angadi" <[email protected]> > a > >> >>>>> écrit : > >> >>>>> > >> >>>>>> Core issue here is that there is no explicit concept of > 'checkpoint' > >> >>> in > >> >>>>>> Beam (UnboundedSource has a method 'getCheckpointMark' but that > >> >>> refers to > >> >>>>>> the checkoint on external source). Runners do checkpoint > internally > >> as > >> >>>>>> implementation detail. Flink's checkpoint model is entirely > >> different > >> >>>>> from > >> >>>>>> Dataflow's and Spark's. > >> >>>>>> > >> >>>>>> @StableReplay helps, but it does not explicitly talk about a > >> >>> checkpoint > >> >>>>> by > >> >>>>>> design. > >> >>>>>> > >> >>>>>> If you are looking to achieve some guarantees with a sink/DoFn, I > >> >>> think > >> >>>>> it > >> >>>>>> is better to start with the requirements. I worked on > exactly-once > >> >>> sink > >> >>>>> for > >> >>>>>> Kafka (see KafkaIO.write().withEOS()), where we essentially > reshard > >> >>> the > >> >>>>>> elements and assign sequence numbers to elements with in each > shard. > >> >>>>>> Duplicates in replays are avoided based on these sequence > numbers. > >> >>> DoFn > >> >>>>>> state API is used to buffer out-of order replays. The > implementation > >> >>>>>> strategy works in Dataflow but not in Flink which has a > horizontal > >> >>>>>> checkpoint. KafkaIO checks for compatibility. > >> >>>>>> > >> >>>>>> On Wed, Nov 15, 2017 at 12:38 AM, Romain Manni-Bucau < > >> >>>>>> [email protected]> > >> >>>>>> wrote: > >> >>>>>> > >> >>>>>>> Hi guys, > >> >>>>>>> > >> >>>>>>> The subject is a bit provocative but the topic is real and > coming > >> >>>>>>> again and again with the beam usage: how a dofn can handle some > >> >>>>>>> "chunking". > >> >>>>>>> > >> >>>>>>> The need is to be able to commit each N records but with N not > too > >> >>> big. > >> >>>>>>> > >> >>>>>>> The natural API for that in beam is the bundle one but bundles > are > >> >>> not > >> >>>>>>> reliable since they can be very small (flink) - we can say it is > >> >>> "ok" > >> >>>>>>> even if it has some perf impacts - or too big (spark does full > size > >> >>> / > >> >>>>>>> #workers). > >> >>>>>>> > >> >>>>>>> The workaround is what we see in the ES I/O: a maxSize which > does > >> an > >> >>>>>>> eager flush. The issue is that then the checkpoint is not > respected > >> >>>>>>> and you can process multiple times the same records. > >> >>>>>>> > >> >>>>>>> Any plan to make this API reliable and controllable from a beam > >> >>> point > >> >>>>>>> of view (at least in a max manner)? > >> >>>>>>> > >> >>>>>>> Thanks, > >> >>>>>>> Romain Manni-Bucau > >> >>>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn > >> >>>>>>> > >> >>>>>> > >> >>>>> > >> >>> > >> > >> -- > >> Jean-Baptiste Onofré > >> [email protected] > >> http://blog.nanthrax.net > >> Talend - http://www.talend.com > >> >
