JB, not sure what you mean? SDFs and triggers are unrelated, and the post
doesn't mention the word. Did you mean something else, e.g. restriction
perhaps? Either way I don't think SDFs are the solution here; SDFs have to
do with the ability to split the processing of *a single element* over
multiple calls, whereas Romain I think is asking for repeatable grouping of
*multiple* elements.

Romain - does
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
do what
you want?

On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

> It sounds like the "Trigger" in the Splittable DoFn, no ?
>
> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
>
> Regards
> JB
>
>
> On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote:
> > it gives the fn/transform the ability to save a state - it can get
> > back on "restart" / whatever unit we can use, probably runner
> > dependent? Without that you need to rewrite all IO usage with
> > something like the previous pattern which makes the IO not self
> > sufficient and kind of makes the entry cost and usage of beam way
> > further.
> >
> > In my mind it is exactly what jbatch/spring-batch uses but adapted to
> > beam (stream in particular) case.
> >
> > Romain Manni-Bucau
> > @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >
> >
> > 2017-11-17 6:49 GMT+01:00 Reuven Lax <re...@google.com.invalid>:
> >> Romain,
> >>
> >> Can you define what you mean by checkpoint? What are the semantics, what
> >> does it accomplish?
> >>
> >> Reuven
> >>
> >> On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau <
> rmannibu...@gmail.com>
> >> wrote:
> >>
> >>> Yes, what I propose earlier was:
> >>>
> >>> I. checkpoint marker:
> >>>
> >>> @AnyBeamAnnotation
> >>> @CheckpointAfter
> >>> public void someHook(SomeContext ctx);
> >>>
> >>>
> >>> II. pipeline.apply(ParDo.of(new MyFn()).withCheckpointAlgorithm(new
> >>> CountingAlgo()))
> >>>
> >>> III. (I like this one less)
> >>>
> >>> // in the dofn
> >>> @CheckpointTester
> >>> public boolean shouldCheckpoint();
> >>>
> >>> IV. @Checkpointer Serializable getCheckpoint(); in the dofn per element
> >>>
> >>>
> >>>
> >>>
> >>> Romain Manni-Bucau
> >>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >>>
> >>>
> >>> 2017-11-17 6:06 GMT+01:00 Raghu Angadi <rang...@google.com.invalid>:
> >>>> How would you define it (rough API is fine)?. Without more details,
> it is
> >>>> not easy to see wider applicability and feasibility in runners.
> >>>>
> >>>> On Thu, Nov 16, 2017 at 1:13 PM, Romain Manni-Bucau <
> >>> rmannibu...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> This is a fair summary of the current state but also where beam can
> >>> have a
> >>>>> very strong added value and make big data great and smooth.
> >>>>>
> >>>>> Instead of this replay feature isnt checkpointing willable? In
> >>> particular
> >>>>> with SDF no?
> >>>>>
> >>>>>
> >>>>> Le 16 nov. 2017 19:50, "Raghu Angadi" <rang...@google.com.invalid> a
> >>>>> écrit :
> >>>>>
> >>>>>> Core issue here is that there is no explicit concept of 'checkpoint'
> >>> in
> >>>>>> Beam (UnboundedSource has a method 'getCheckpointMark' but that
> >>> refers to
> >>>>>> the checkoint on external source). Runners do checkpoint internally
> as
> >>>>>> implementation detail. Flink's checkpoint model is entirely
> different
> >>>>> from
> >>>>>> Dataflow's and Spark's.
> >>>>>>
> >>>>>> @StableReplay helps, but it does not explicitly talk about a
> >>> checkpoint
> >>>>> by
> >>>>>> design.
> >>>>>>
> >>>>>> If you are looking to achieve some guarantees with a sink/DoFn, I
> >>> think
> >>>>> it
> >>>>>> is better to start with the requirements. I worked on exactly-once
> >>> sink
> >>>>> for
> >>>>>> Kafka (see KafkaIO.write().withEOS()), where we essentially reshard
> >>> the
> >>>>>> elements and assign sequence numbers to elements with in each shard.
> >>>>>> Duplicates in replays are avoided based on these sequence numbers.
> >>> DoFn
> >>>>>> state API is used to buffer out-of order replays. The implementation
> >>>>>> strategy works in Dataflow but not in Flink which has a horizontal
> >>>>>> checkpoint. KafkaIO checks for compatibility.
> >>>>>>
> >>>>>> On Wed, Nov 15, 2017 at 12:38 AM, Romain Manni-Bucau <
> >>>>>> rmannibu...@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi guys,
> >>>>>>>
> >>>>>>> The subject is a bit provocative but the topic is real and coming
> >>>>>>> again and again with the beam usage: how a dofn can handle some
> >>>>>>> "chunking".
> >>>>>>>
> >>>>>>> The need is to be able to commit each N records but with N not too
> >>> big.
> >>>>>>>
> >>>>>>> The natural API for that in beam is the bundle one but bundles are
> >>> not
> >>>>>>> reliable since they can be very small (flink) - we can say it is
> >>> "ok"
> >>>>>>> even if it has some perf impacts - or too big (spark does full size
> >>> /
> >>>>>>> #workers).
> >>>>>>>
> >>>>>>> The workaround is what we see in the ES I/O: a maxSize which does
> an
> >>>>>>> eager flush. The issue is that then the checkpoint is not respected
> >>>>>>> and you can process multiple times the same records.
> >>>>>>>
> >>>>>>> Any plan to make this API reliable and controllable from a beam
> >>> point
> >>>>>>> of view (at least in a max manner)?
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Romain Manni-Bucau
> >>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Reply via email to