Can we describe this at a higher level?

I think what you want is the following. Please correct if I'm
misunderstanding.

Batches of 100 elements (is this a hard requirement, or do they have to be
"approximately" 100 element?)

Once you see a batch, you're guaranteed to see the same batch on retries.

You want to then idempotently insert this batch into some backend. Things
may fail, workers may crash, but in that case you want to get the exact
same batch back so you can insert it again.

Do you care about ordering? On failure do you have to see the same batches
in the same order as before, or is it sufficient to see the same batches?

Reuven

On Wed, Nov 15, 2017 at 5:58 PM, Romain Manni-Bucau <rmannibu...@gmail.com>
wrote:

> Overall goal is to ensure each 100 elements max, a "backend" (as
> datastore) flush/commit/push is done and is aligned with beam
> checkpoints. You can see it as bringing the "general" commit-interval
> notion to beam and kind of get rid of the bundle notion which is
> almost impossible to use today.
>
> Romain Manni-Bucau
> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>
>
> 2017-11-15 10:27 GMT+01:00 Reuven Lax <re...@google.com.invalid>:
> > It's in the dev list archives, not sure if there's a doc yet.
> >
> > I'm not quite sure I understand what you mean by a "flush" Can you
> describe
> > the problem you're trying to solve?
> >
> > Reuven
> >
> > On Wed, Nov 15, 2017 at 5:25 PM, Romain Manni-Bucau <
> rmannibu...@gmail.com>
> > wrote:
> >
> >> Hmm, I didn't find the doc - if you have the link not far it would be
> >> appreciated - but "before" sounds not enough, it should be "after" in
> >> case there was a "flush" no?
> >>
> >> Romain Manni-Bucau
> >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >>
> >>
> >> 2017-11-15 10:10 GMT+01:00 Reuven Lax <re...@google.com.invalid>:
> >> > If you set @StableReplay before a ParDo, it forces a checkpoint before
> >> that
> >> > ParDo.
> >> >
> >> > On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau <
> >> rmannibu...@gmail.com>
> >> > wrote:
> >> >
> >> >> It sounds a good start. I'm not sure how a group by key (and not by
> >> >> size) can help controlling the checkpointing interval. Wonder if we
> >> >> shouldn't be able to have a CheckpointPolicy { boolean
> >> >> shouldCheckpoint() } used in the processing event loop. Default could
> >> >> be up to the runner but if set on the transform (or dofn) it would be
> >> >> used to control when the checkpoint is done. Thinking out loud it
> >> >> sounds close to jbatch checkpoint algorithm
> >> >> (https://docs.oracle.com/javaee/7/api/javax/batch/api/
> >> >> chunk/CheckpointAlgorithm.html)
> >> >>
> >> >> Romain Manni-Bucau
> >> >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >> >>
> >> >>
> >> >> 2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net>:
> >> >> > Yes, @StableReplay, that's the annotation. Thanks.
> >> >> >
> >> >> >
> >> >> > On 11/15/2017 09:52 AM, Reuven Lax wrote:
> >> >> >>
> >> >> >> Romain,
> >> >> >>
> >> >> >> I think the @StableReplay semantic that Kenn proposed a month or
> so
> >> ago
> >> >> is
> >> >> >> what is needed here.
> >> >> >>
> >> >> >> Essentially it will ensure that the GroupByKey iterable is stable
> and
> >> >> >> checkpointed. So on replay, the GroupByKey is guaranteed to
> receive
> >> the
> >> >> >> exact same iterable as it did before. The annotation can be put
> on a
> >> >> ParDo
> >> >> >> as well, in which case it ensures stability (and checkpointing) of
> >> the
> >> >> >> individual ParDo elements.
> >> >> >>
> >> >> >> Reuven
> >> >> >>
> >> >> >> On Wed, Nov 15, 2017 at 4:49 PM, Romain Manni-Bucau
> >> >> >> <rmannibu...@gmail.com>
> >> >> >> wrote:
> >> >> >>
> >> >> >>> 2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net
> >:
> >> >> >>>>
> >> >> >>>> Hi Romain,
> >> >> >>>>
> >> >> >>>> You are right: currently, the chunking is related to bundles.
> >> Today,
> >> >> the
> >> >> >>>> bundle size is under the runner responsibility.
> >> >> >>>>
> >> >> >>>> I think it's fine because only the runner know an efficient
> bundle
> >> >> size.
> >> >> >>>
> >> >> >>> I'm
> >> >> >>>>
> >> >> >>>> afraid giving the "control" of the bundle size to the end user
> (via
> >> >> >>>> pipeline) can result to huge performances issue depending of the
> >> >> runner.
> >> >> >>>>
> >> >> >>>> It doesn't mean that we can't use an uber layer: it's what we
> do in
> >> >> >>>> ParDoWithBatch or DoFn in IO Sink where we have a batch size.
> >> >> >>>>
> >> >> >>>> Anyway, the core problem is about the checkpoint: why a
> checkpoint
> >> is
> >> >> >>>> not
> >> >> >>>> "respected" by an IO or runner ?
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>> Take the example of a runner deciding the bundle size is 4 and
> the
> >> IO
> >> >> >>> deciding the commit-interval (batch semantic) is 2, what happens
> if
> >> >> >>> the 3rd record fails? You have pushed to the store 2 records
> which
> >> can
> >> >> >>> be reprocessed by a restart of the bundle and you can get
> >> duplicates.
> >> >> >>>
> >> >> >>> Rephrased: I think we need as a framework a batch/chunk solution
> >> which
> >> >> >>> is reliable. I understand bundles is mapped on the runner and not
> >> >> >>> really controlled but can we get something more reliable for the
> >> user?
> >> >> >>> Maybe we need a @BeforeBatch or something like that.
> >> >> >>>
> >> >> >>>>
> >> >> >>>> Regards
> >> >> >>>> JB
> >> >> >>>>
> >> >> >>>>
> >> >> >>>> On 11/15/2017 09:38 AM, Romain Manni-Bucau wrote:
> >> >> >>>>>
> >> >> >>>>>
> >> >> >>>>> Hi guys,
> >> >> >>>>>
> >> >> >>>>> The subject is a bit provocative but the topic is real and
> coming
> >> >> >>>>> again and again with the beam usage: how a dofn can handle some
> >> >> >>>>> "chunking".
> >> >> >>>>>
> >> >> >>>>> The need is to be able to commit each N records but with N not
> too
> >> >> big.
> >> >> >>>>>
> >> >> >>>>> The natural API for that in beam is the bundle one but bundles
> are
> >> >> not
> >> >> >>>>> reliable since they can be very small (flink) - we can say it
> is
> >> "ok"
> >> >> >>>>> even if it has some perf impacts - or too big (spark does full
> >> size /
> >> >> >>>>> #workers).
> >> >> >>>>>
> >> >> >>>>> The workaround is what we see in the ES I/O: a maxSize which
> does
> >> an
> >> >> >>>>> eager flush. The issue is that then the checkpoint is not
> >> respected
> >> >> >>>>> and you can process multiple times the same records.
> >> >> >>>>>
> >> >> >>>>> Any plan to make this API reliable and controllable from a beam
> >> point
> >> >> >>>>> of view (at least in a max manner)?
> >> >> >>>>>
> >> >> >>>>> Thanks,
> >> >> >>>>> Romain Manni-Bucau
> >> >> >>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >> >> >>>>>
> >> >> >>>>
> >> >> >>>> --
> >> >> >>>> Jean-Baptiste Onofré
> >> >> >>>> jbono...@apache.org
> >> >> >>>> http://blog.nanthrax.net
> >> >> >>>> Talend - http://www.talend.com
> >> >> >>>
> >> >> >>>
> >> >> >>
> >> >> >
> >> >> > --
> >> >> > Jean-Baptiste Onofré
> >> >> > jbono...@apache.org
> >> >> > http://blog.nanthrax.net
> >> >> > Talend - http://www.talend.com
> >> >>
> >>
>

Reply via email to