@Eugene: yes and the other alternative of Reuven too but it is still 1. relying on timers, 2. not really checkpointed
In other words it seems all solutions are to create a chunk of size 1 and replayable to fake the lack of chunking in the framework. This always implies a chunk handling outside the component (typically before for an output). My point is I think IO need it in their own "internal" or at least control it themselves since the chunk size is part of the IO handling most of the time. I think JB spoke of the same "group before" trick using restrictions which can work I have to admit if SDF are implemented by runners. Is there a roadmap/status on that? Last time I checked SDF was a great API without support :(. Romain Manni-Bucau @rmannibucau | Blog | Old Blog | Github | LinkedIn 2017-11-17 7:25 GMT+01:00 Eugene Kirpichov <[email protected]>: > JB, not sure what you mean? SDFs and triggers are unrelated, and the post > doesn't mention the word. Did you mean something else, e.g. restriction > perhaps? Either way I don't think SDFs are the solution here; SDFs have to > do with the ability to split the processing of *a single element* over > multiple calls, whereas Romain I think is asking for repeatable grouping of > *multiple* elements. > > Romain - does > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java > do what > you want? > > On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste Onofré <[email protected]> > wrote: > >> It sounds like the "Trigger" in the Splittable DoFn, no ? >> >> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html >> >> Regards >> JB >> >> >> On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote: >> > it gives the fn/transform the ability to save a state - it can get >> > back on "restart" / whatever unit we can use, probably runner >> > dependent? Without that you need to rewrite all IO usage with >> > something like the previous pattern which makes the IO not self >> > sufficient and kind of makes the entry cost and usage of beam way >> > further. >> > >> > In my mind it is exactly what jbatch/spring-batch uses but adapted to >> > beam (stream in particular) case. >> > >> > Romain Manni-Bucau >> > @rmannibucau | Blog | Old Blog | Github | LinkedIn >> > >> > >> > 2017-11-17 6:49 GMT+01:00 Reuven Lax <[email protected]>: >> >> Romain, >> >> >> >> Can you define what you mean by checkpoint? What are the semantics, what >> >> does it accomplish? >> >> >> >> Reuven >> >> >> >> On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau < >> [email protected]> >> >> wrote: >> >> >> >>> Yes, what I propose earlier was: >> >>> >> >>> I. checkpoint marker: >> >>> >> >>> @AnyBeamAnnotation >> >>> @CheckpointAfter >> >>> public void someHook(SomeContext ctx); >> >>> >> >>> >> >>> II. pipeline.apply(ParDo.of(new MyFn()).withCheckpointAlgorithm(new >> >>> CountingAlgo())) >> >>> >> >>> III. (I like this one less) >> >>> >> >>> // in the dofn >> >>> @CheckpointTester >> >>> public boolean shouldCheckpoint(); >> >>> >> >>> IV. @Checkpointer Serializable getCheckpoint(); in the dofn per element >> >>> >> >>> >> >>> >> >>> >> >>> Romain Manni-Bucau >> >>> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >>> >> >>> >> >>> 2017-11-17 6:06 GMT+01:00 Raghu Angadi <[email protected]>: >> >>>> How would you define it (rough API is fine)?. Without more details, >> it is >> >>>> not easy to see wider applicability and feasibility in runners. >> >>>> >> >>>> On Thu, Nov 16, 2017 at 1:13 PM, Romain Manni-Bucau < >> >>> [email protected]> >> >>>> wrote: >> >>>> >> >>>>> This is a fair summary of the current state but also where beam can >> >>> have a >> >>>>> very strong added value and make big data great and smooth. >> >>>>> >> >>>>> Instead of this replay feature isnt checkpointing willable? In >> >>> particular >> >>>>> with SDF no? >> >>>>> >> >>>>> >> >>>>> Le 16 nov. 2017 19:50, "Raghu Angadi" <[email protected]> a >> >>>>> écrit : >> >>>>> >> >>>>>> Core issue here is that there is no explicit concept of 'checkpoint' >> >>> in >> >>>>>> Beam (UnboundedSource has a method 'getCheckpointMark' but that >> >>> refers to >> >>>>>> the checkoint on external source). Runners do checkpoint internally >> as >> >>>>>> implementation detail. Flink's checkpoint model is entirely >> different >> >>>>> from >> >>>>>> Dataflow's and Spark's. >> >>>>>> >> >>>>>> @StableReplay helps, but it does not explicitly talk about a >> >>> checkpoint >> >>>>> by >> >>>>>> design. >> >>>>>> >> >>>>>> If you are looking to achieve some guarantees with a sink/DoFn, I >> >>> think >> >>>>> it >> >>>>>> is better to start with the requirements. I worked on exactly-once >> >>> sink >> >>>>> for >> >>>>>> Kafka (see KafkaIO.write().withEOS()), where we essentially reshard >> >>> the >> >>>>>> elements and assign sequence numbers to elements with in each shard. >> >>>>>> Duplicates in replays are avoided based on these sequence numbers. >> >>> DoFn >> >>>>>> state API is used to buffer out-of order replays. The implementation >> >>>>>> strategy works in Dataflow but not in Flink which has a horizontal >> >>>>>> checkpoint. KafkaIO checks for compatibility. >> >>>>>> >> >>>>>> On Wed, Nov 15, 2017 at 12:38 AM, Romain Manni-Bucau < >> >>>>>> [email protected]> >> >>>>>> wrote: >> >>>>>> >> >>>>>>> Hi guys, >> >>>>>>> >> >>>>>>> The subject is a bit provocative but the topic is real and coming >> >>>>>>> again and again with the beam usage: how a dofn can handle some >> >>>>>>> "chunking". >> >>>>>>> >> >>>>>>> The need is to be able to commit each N records but with N not too >> >>> big. >> >>>>>>> >> >>>>>>> The natural API for that in beam is the bundle one but bundles are >> >>> not >> >>>>>>> reliable since they can be very small (flink) - we can say it is >> >>> "ok" >> >>>>>>> even if it has some perf impacts - or too big (spark does full size >> >>> / >> >>>>>>> #workers). >> >>>>>>> >> >>>>>>> The workaround is what we see in the ES I/O: a maxSize which does >> an >> >>>>>>> eager flush. The issue is that then the checkpoint is not respected >> >>>>>>> and you can process multiple times the same records. >> >>>>>>> >> >>>>>>> Any plan to make this API reliable and controllable from a beam >> >>> point >> >>>>>>> of view (at least in a max manner)? >> >>>>>>> >> >>>>>>> Thanks, >> >>>>>>> Romain Manni-Bucau >> >>>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >>>>>>> >> >>>>>> >> >>>>> >> >>> >> >> -- >> Jean-Baptiste Onofré >> [email protected] >> http://blog.nanthrax.net >> Talend - http://www.talend.com >>
