2017-11-15 11:42 GMT+01:00 Reuven Lax <re...@google.com.invalid>:
> Can we describe this at a higher level?
>
> I think what you want is the following. Please correct if I'm
> misunderstanding.
>
> Batches of 100 elements (is this a hard requirement, or do they have to be
> "approximately" 100 element?)

Approximately is fine while documented (what is not is 1000000 instead
of 10 for instance)

>
> Once you see a batch, you're guaranteed to see the same batch on retries.

+1

>
> You want to then idempotently insert this batch into some backend. Things
> may fail, workers may crash, but in that case you want to get the exact
> same batch back so you can insert it again.

+1

>
> Do you care about ordering? On failure do you have to see the same batches
> in the same order as before, or is it sufficient to see the same batches?

Beam doesnt everywhere so I guess it is not important - at least for
my cases this statement is true.

>
> Reuven
>
> On Wed, Nov 15, 2017 at 5:58 PM, Romain Manni-Bucau <rmannibu...@gmail.com>
> wrote:
>
>> Overall goal is to ensure each 100 elements max, a "backend" (as
>> datastore) flush/commit/push is done and is aligned with beam
>> checkpoints. You can see it as bringing the "general" commit-interval
>> notion to beam and kind of get rid of the bundle notion which is
>> almost impossible to use today.
>>
>> Romain Manni-Bucau
>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>
>>
>> 2017-11-15 10:27 GMT+01:00 Reuven Lax <re...@google.com.invalid>:
>> > It's in the dev list archives, not sure if there's a doc yet.
>> >
>> > I'm not quite sure I understand what you mean by a "flush" Can you
>> describe
>> > the problem you're trying to solve?
>> >
>> > Reuven
>> >
>> > On Wed, Nov 15, 2017 at 5:25 PM, Romain Manni-Bucau <
>> rmannibu...@gmail.com>
>> > wrote:
>> >
>> >> Hmm, I didn't find the doc - if you have the link not far it would be
>> >> appreciated - but "before" sounds not enough, it should be "after" in
>> >> case there was a "flush" no?
>> >>
>> >> Romain Manni-Bucau
>> >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >>
>> >>
>> >> 2017-11-15 10:10 GMT+01:00 Reuven Lax <re...@google.com.invalid>:
>> >> > If you set @StableReplay before a ParDo, it forces a checkpoint before
>> >> that
>> >> > ParDo.
>> >> >
>> >> > On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau <
>> >> rmannibu...@gmail.com>
>> >> > wrote:
>> >> >
>> >> >> It sounds a good start. I'm not sure how a group by key (and not by
>> >> >> size) can help controlling the checkpointing interval. Wonder if we
>> >> >> shouldn't be able to have a CheckpointPolicy { boolean
>> >> >> shouldCheckpoint() } used in the processing event loop. Default could
>> >> >> be up to the runner but if set on the transform (or dofn) it would be
>> >> >> used to control when the checkpoint is done. Thinking out loud it
>> >> >> sounds close to jbatch checkpoint algorithm
>> >> >> (https://docs.oracle.com/javaee/7/api/javax/batch/api/
>> >> >> chunk/CheckpointAlgorithm.html)
>> >> >>
>> >> >> Romain Manni-Bucau
>> >> >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >> >>
>> >> >>
>> >> >> 2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net>:
>> >> >> > Yes, @StableReplay, that's the annotation. Thanks.
>> >> >> >
>> >> >> >
>> >> >> > On 11/15/2017 09:52 AM, Reuven Lax wrote:
>> >> >> >>
>> >> >> >> Romain,
>> >> >> >>
>> >> >> >> I think the @StableReplay semantic that Kenn proposed a month or
>> so
>> >> ago
>> >> >> is
>> >> >> >> what is needed here.
>> >> >> >>
>> >> >> >> Essentially it will ensure that the GroupByKey iterable is stable
>> and
>> >> >> >> checkpointed. So on replay, the GroupByKey is guaranteed to
>> receive
>> >> the
>> >> >> >> exact same iterable as it did before. The annotation can be put
>> on a
>> >> >> ParDo
>> >> >> >> as well, in which case it ensures stability (and checkpointing) of
>> >> the
>> >> >> >> individual ParDo elements.
>> >> >> >>
>> >> >> >> Reuven
>> >> >> >>
>> >> >> >> On Wed, Nov 15, 2017 at 4:49 PM, Romain Manni-Bucau
>> >> >> >> <rmannibu...@gmail.com>
>> >> >> >> wrote:
>> >> >> >>
>> >> >> >>> 2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net
>> >:
>> >> >> >>>>
>> >> >> >>>> Hi Romain,
>> >> >> >>>>
>> >> >> >>>> You are right: currently, the chunking is related to bundles.
>> >> Today,
>> >> >> the
>> >> >> >>>> bundle size is under the runner responsibility.
>> >> >> >>>>
>> >> >> >>>> I think it's fine because only the runner know an efficient
>> bundle
>> >> >> size.
>> >> >> >>>
>> >> >> >>> I'm
>> >> >> >>>>
>> >> >> >>>> afraid giving the "control" of the bundle size to the end user
>> (via
>> >> >> >>>> pipeline) can result to huge performances issue depending of the
>> >> >> runner.
>> >> >> >>>>
>> >> >> >>>> It doesn't mean that we can't use an uber layer: it's what we
>> do in
>> >> >> >>>> ParDoWithBatch or DoFn in IO Sink where we have a batch size.
>> >> >> >>>>
>> >> >> >>>> Anyway, the core problem is about the checkpoint: why a
>> checkpoint
>> >> is
>> >> >> >>>> not
>> >> >> >>>> "respected" by an IO or runner ?
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>> Take the example of a runner deciding the bundle size is 4 and
>> the
>> >> IO
>> >> >> >>> deciding the commit-interval (batch semantic) is 2, what happens
>> if
>> >> >> >>> the 3rd record fails? You have pushed to the store 2 records
>> which
>> >> can
>> >> >> >>> be reprocessed by a restart of the bundle and you can get
>> >> duplicates.
>> >> >> >>>
>> >> >> >>> Rephrased: I think we need as a framework a batch/chunk solution
>> >> which
>> >> >> >>> is reliable. I understand bundles is mapped on the runner and not
>> >> >> >>> really controlled but can we get something more reliable for the
>> >> user?
>> >> >> >>> Maybe we need a @BeforeBatch or something like that.
>> >> >> >>>
>> >> >> >>>>
>> >> >> >>>> Regards
>> >> >> >>>> JB
>> >> >> >>>>
>> >> >> >>>>
>> >> >> >>>> On 11/15/2017 09:38 AM, Romain Manni-Bucau wrote:
>> >> >> >>>>>
>> >> >> >>>>>
>> >> >> >>>>> Hi guys,
>> >> >> >>>>>
>> >> >> >>>>> The subject is a bit provocative but the topic is real and
>> coming
>> >> >> >>>>> again and again with the beam usage: how a dofn can handle some
>> >> >> >>>>> "chunking".
>> >> >> >>>>>
>> >> >> >>>>> The need is to be able to commit each N records but with N not
>> too
>> >> >> big.
>> >> >> >>>>>
>> >> >> >>>>> The natural API for that in beam is the bundle one but bundles
>> are
>> >> >> not
>> >> >> >>>>> reliable since they can be very small (flink) - we can say it
>> is
>> >> "ok"
>> >> >> >>>>> even if it has some perf impacts - or too big (spark does full
>> >> size /
>> >> >> >>>>> #workers).
>> >> >> >>>>>
>> >> >> >>>>> The workaround is what we see in the ES I/O: a maxSize which
>> does
>> >> an
>> >> >> >>>>> eager flush. The issue is that then the checkpoint is not
>> >> respected
>> >> >> >>>>> and you can process multiple times the same records.
>> >> >> >>>>>
>> >> >> >>>>> Any plan to make this API reliable and controllable from a beam
>> >> point
>> >> >> >>>>> of view (at least in a max manner)?
>> >> >> >>>>>
>> >> >> >>>>> Thanks,
>> >> >> >>>>> Romain Manni-Bucau
>> >> >> >>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >> >> >>>>>
>> >> >> >>>>
>> >> >> >>>> --
>> >> >> >>>> Jean-Baptiste Onofré
>> >> >> >>>> jbono...@apache.org
>> >> >> >>>> http://blog.nanthrax.net
>> >> >> >>>> Talend - http://www.talend.com
>> >> >> >>>
>> >> >> >>>
>> >> >> >>
>> >> >> >
>> >> >> > --
>> >> >> > Jean-Baptiste Onofré
>> >> >> > jbono...@apache.org
>> >> >> > http://blog.nanthrax.net
>> >> >> > Talend - http://www.talend.com
>> >> >>
>> >>
>>

Reply via email to