JB, not sure what you mean? SDFs and triggers are unrelated, and the post doesn't mention the word. Did you mean something else, e.g. restriction perhaps? Either way I don't think SDFs are the solution here; SDFs have to do with the ability to split the processing of *a single element* over multiple calls, whereas Romain I think is asking for repeatable grouping of *multiple* elements.
Romain - does https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java do what you want? On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste Onofré <[email protected]> wrote: > It sounds like the "Trigger" in the Splittable DoFn, no ? > > https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html > > Regards > JB > > > On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote: > > it gives the fn/transform the ability to save a state - it can get > > back on "restart" / whatever unit we can use, probably runner > > dependent? Without that you need to rewrite all IO usage with > > something like the previous pattern which makes the IO not self > > sufficient and kind of makes the entry cost and usage of beam way > > further. > > > > In my mind it is exactly what jbatch/spring-batch uses but adapted to > > beam (stream in particular) case. > > > > Romain Manni-Bucau > > @rmannibucau | Blog | Old Blog | Github | LinkedIn > > > > > > 2017-11-17 6:49 GMT+01:00 Reuven Lax <[email protected]>: > >> Romain, > >> > >> Can you define what you mean by checkpoint? What are the semantics, what > >> does it accomplish? > >> > >> Reuven > >> > >> On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau < > [email protected]> > >> wrote: > >> > >>> Yes, what I propose earlier was: > >>> > >>> I. checkpoint marker: > >>> > >>> @AnyBeamAnnotation > >>> @CheckpointAfter > >>> public void someHook(SomeContext ctx); > >>> > >>> > >>> II. pipeline.apply(ParDo.of(new MyFn()).withCheckpointAlgorithm(new > >>> CountingAlgo())) > >>> > >>> III. (I like this one less) > >>> > >>> // in the dofn > >>> @CheckpointTester > >>> public boolean shouldCheckpoint(); > >>> > >>> IV. @Checkpointer Serializable getCheckpoint(); in the dofn per element > >>> > >>> > >>> > >>> > >>> Romain Manni-Bucau > >>> @rmannibucau | Blog | Old Blog | Github | LinkedIn > >>> > >>> > >>> 2017-11-17 6:06 GMT+01:00 Raghu Angadi <[email protected]>: > >>>> How would you define it (rough API is fine)?. Without more details, > it is > >>>> not easy to see wider applicability and feasibility in runners. > >>>> > >>>> On Thu, Nov 16, 2017 at 1:13 PM, Romain Manni-Bucau < > >>> [email protected]> > >>>> wrote: > >>>> > >>>>> This is a fair summary of the current state but also where beam can > >>> have a > >>>>> very strong added value and make big data great and smooth. > >>>>> > >>>>> Instead of this replay feature isnt checkpointing willable? In > >>> particular > >>>>> with SDF no? > >>>>> > >>>>> > >>>>> Le 16 nov. 2017 19:50, "Raghu Angadi" <[email protected]> a > >>>>> écrit : > >>>>> > >>>>>> Core issue here is that there is no explicit concept of 'checkpoint' > >>> in > >>>>>> Beam (UnboundedSource has a method 'getCheckpointMark' but that > >>> refers to > >>>>>> the checkoint on external source). Runners do checkpoint internally > as > >>>>>> implementation detail. Flink's checkpoint model is entirely > different > >>>>> from > >>>>>> Dataflow's and Spark's. > >>>>>> > >>>>>> @StableReplay helps, but it does not explicitly talk about a > >>> checkpoint > >>>>> by > >>>>>> design. > >>>>>> > >>>>>> If you are looking to achieve some guarantees with a sink/DoFn, I > >>> think > >>>>> it > >>>>>> is better to start with the requirements. I worked on exactly-once > >>> sink > >>>>> for > >>>>>> Kafka (see KafkaIO.write().withEOS()), where we essentially reshard > >>> the > >>>>>> elements and assign sequence numbers to elements with in each shard. > >>>>>> Duplicates in replays are avoided based on these sequence numbers. > >>> DoFn > >>>>>> state API is used to buffer out-of order replays. The implementation > >>>>>> strategy works in Dataflow but not in Flink which has a horizontal > >>>>>> checkpoint. KafkaIO checks for compatibility. > >>>>>> > >>>>>> On Wed, Nov 15, 2017 at 12:38 AM, Romain Manni-Bucau < > >>>>>> [email protected]> > >>>>>> wrote: > >>>>>> > >>>>>>> Hi guys, > >>>>>>> > >>>>>>> The subject is a bit provocative but the topic is real and coming > >>>>>>> again and again with the beam usage: how a dofn can handle some > >>>>>>> "chunking". > >>>>>>> > >>>>>>> The need is to be able to commit each N records but with N not too > >>> big. > >>>>>>> > >>>>>>> The natural API for that in beam is the bundle one but bundles are > >>> not > >>>>>>> reliable since they can be very small (flink) - we can say it is > >>> "ok" > >>>>>>> even if it has some perf impacts - or too big (spark does full size > >>> / > >>>>>>> #workers). > >>>>>>> > >>>>>>> The workaround is what we see in the ES I/O: a maxSize which does > an > >>>>>>> eager flush. The issue is that then the checkpoint is not respected > >>>>>>> and you can process multiple times the same records. > >>>>>>> > >>>>>>> Any plan to make this API reliable and controllable from a beam > >>> point > >>>>>>> of view (at least in a max manner)? > >>>>>>> > >>>>>>> Thanks, > >>>>>>> Romain Manni-Bucau > >>>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn > >>>>>>> > >>>>>> > >>>>> > >>> > > -- > Jean-Baptiste Onofré > [email protected] > http://blog.nanthrax.net > Talend - http://www.talend.com >
