In case the connection is not clear to folks on this thread, I pinged the
thread on @StableReplay / @RequiresStableInput / etc and opened a draft PR
at https://github.com/apache/beam/pull/4135.

On Wed, Nov 15, 2017 at 3:24 AM, Reuven Lax <re...@google.com.invalid>
wrote:

> so I think the following will do exactly that and can be easily factored
> into a reusable transform (modulo Java type boilerplate):
>
> pCollection.apply(WithKeys.of((Element e) ->
> ThreadLocalRandom.current().nextInt(N))
>                   .apply(Window.into(new GlobalWindows())
>
> .triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.
> elementCountAtLeast(100))))
>                   .apply(GroupByKey.create())
>                   .apply(ParDo.of(new DoFn<>() {
>                       @ProcessElement
>                       @StableReplay
>                        public void processElement(ProcessContext c) {
>                          // Insert c.element().getValue() into backend.
>                        }
>                    });
>
> On Wed, Nov 15, 2017 at 7:00 PM, Romain Manni-Bucau <rmannibu...@gmail.com
> >
> wrote:
>
> > 2017-11-15 11:42 GMT+01:00 Reuven Lax <re...@google.com.invalid>:
> > > Can we describe this at a higher level?
> > >
> > > I think what you want is the following. Please correct if I'm
> > > misunderstanding.
> > >
> > > Batches of 100 elements (is this a hard requirement, or do they have to
> > be
> > > "approximately" 100 element?)
> >
> > Approximately is fine while documented (what is not is 1000000 instead
> > of 10 for instance)
> >
> > >
> > > Once you see a batch, you're guaranteed to see the same batch on
> retries.
> >
> > +1
> >
> > >
> > > You want to then idempotently insert this batch into some backend.
> Things
> > > may fail, workers may crash, but in that case you want to get the exact
> > > same batch back so you can insert it again.
> >
> > +1
> >
> > >
> > > Do you care about ordering? On failure do you have to see the same
> > batches
> > > in the same order as before, or is it sufficient to see the same
> batches?
> >
> > Beam doesnt everywhere so I guess it is not important - at least for
> > my cases this statement is true.
> >
> > >
> > > Reuven
> > >
> > > On Wed, Nov 15, 2017 at 5:58 PM, Romain Manni-Bucau <
> > rmannibu...@gmail.com>
> > > wrote:
> > >
> > >> Overall goal is to ensure each 100 elements max, a "backend" (as
> > >> datastore) flush/commit/push is done and is aligned with beam
> > >> checkpoints. You can see it as bringing the "general" commit-interval
> > >> notion to beam and kind of get rid of the bundle notion which is
> > >> almost impossible to use today.
> > >>
> > >> Romain Manni-Bucau
> > >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> > >>
> > >>
> > >> 2017-11-15 10:27 GMT+01:00 Reuven Lax <re...@google.com.invalid>:
> > >> > It's in the dev list archives, not sure if there's a doc yet.
> > >> >
> > >> > I'm not quite sure I understand what you mean by a "flush" Can you
> > >> describe
> > >> > the problem you're trying to solve?
> > >> >
> > >> > Reuven
> > >> >
> > >> > On Wed, Nov 15, 2017 at 5:25 PM, Romain Manni-Bucau <
> > >> rmannibu...@gmail.com>
> > >> > wrote:
> > >> >
> > >> >> Hmm, I didn't find the doc - if you have the link not far it would
> be
> > >> >> appreciated - but "before" sounds not enough, it should be "after"
> in
> > >> >> case there was a "flush" no?
> > >> >>
> > >> >> Romain Manni-Bucau
> > >> >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> > >> >>
> > >> >>
> > >> >> 2017-11-15 10:10 GMT+01:00 Reuven Lax <re...@google.com.invalid>:
> > >> >> > If you set @StableReplay before a ParDo, it forces a checkpoint
> > before
> > >> >> that
> > >> >> > ParDo.
> > >> >> >
> > >> >> > On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau <
> > >> >> rmannibu...@gmail.com>
> > >> >> > wrote:
> > >> >> >
> > >> >> >> It sounds a good start. I'm not sure how a group by key (and not
> > by
> > >> >> >> size) can help controlling the checkpointing interval. Wonder if
> > we
> > >> >> >> shouldn't be able to have a CheckpointPolicy { boolean
> > >> >> >> shouldCheckpoint() } used in the processing event loop. Default
> > could
> > >> >> >> be up to the runner but if set on the transform (or dofn) it
> > would be
> > >> >> >> used to control when the checkpoint is done. Thinking out loud
> it
> > >> >> >> sounds close to jbatch checkpoint algorithm
> > >> >> >> (https://docs.oracle.com/javaee/7/api/javax/batch/api/
> > >> >> >> chunk/CheckpointAlgorithm.html)
> > >> >> >>
> > >> >> >> Romain Manni-Bucau
> > >> >> >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> > >> >> >>
> > >> >> >>
> > >> >> >> 2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net
> >:
> > >> >> >> > Yes, @StableReplay, that's the annotation. Thanks.
> > >> >> >> >
> > >> >> >> >
> > >> >> >> > On 11/15/2017 09:52 AM, Reuven Lax wrote:
> > >> >> >> >>
> > >> >> >> >> Romain,
> > >> >> >> >>
> > >> >> >> >> I think the @StableReplay semantic that Kenn proposed a month
> > or
> > >> so
> > >> >> ago
> > >> >> >> is
> > >> >> >> >> what is needed here.
> > >> >> >> >>
> > >> >> >> >> Essentially it will ensure that the GroupByKey iterable is
> > stable
> > >> and
> > >> >> >> >> checkpointed. So on replay, the GroupByKey is guaranteed to
> > >> receive
> > >> >> the
> > >> >> >> >> exact same iterable as it did before. The annotation can be
> put
> > >> on a
> > >> >> >> ParDo
> > >> >> >> >> as well, in which case it ensures stability (and
> > checkpointing) of
> > >> >> the
> > >> >> >> >> individual ParDo elements.
> > >> >> >> >>
> > >> >> >> >> Reuven
> > >> >> >> >>
> > >> >> >> >> On Wed, Nov 15, 2017 at 4:49 PM, Romain Manni-Bucau
> > >> >> >> >> <rmannibu...@gmail.com>
> > >> >> >> >> wrote:
> > >> >> >> >>
> > >> >> >> >>> 2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré <
> > j...@nanthrax.net
> > >> >:
> > >> >> >> >>>>
> > >> >> >> >>>> Hi Romain,
> > >> >> >> >>>>
> > >> >> >> >>>> You are right: currently, the chunking is related to
> bundles.
> > >> >> Today,
> > >> >> >> the
> > >> >> >> >>>> bundle size is under the runner responsibility.
> > >> >> >> >>>>
> > >> >> >> >>>> I think it's fine because only the runner know an efficient
> > >> bundle
> > >> >> >> size.
> > >> >> >> >>>
> > >> >> >> >>> I'm
> > >> >> >> >>>>
> > >> >> >> >>>> afraid giving the "control" of the bundle size to the end
> > user
> > >> (via
> > >> >> >> >>>> pipeline) can result to huge performances issue depending
> of
> > the
> > >> >> >> runner.
> > >> >> >> >>>>
> > >> >> >> >>>> It doesn't mean that we can't use an uber layer: it's what
> we
> > >> do in
> > >> >> >> >>>> ParDoWithBatch or DoFn in IO Sink where we have a batch
> size.
> > >> >> >> >>>>
> > >> >> >> >>>> Anyway, the core problem is about the checkpoint: why a
> > >> checkpoint
> > >> >> is
> > >> >> >> >>>> not
> > >> >> >> >>>> "respected" by an IO or runner ?
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >> >>> Take the example of a runner deciding the bundle size is 4
> and
> > >> the
> > >> >> IO
> > >> >> >> >>> deciding the commit-interval (batch semantic) is 2, what
> > happens
> > >> if
> > >> >> >> >>> the 3rd record fails? You have pushed to the store 2 records
> > >> which
> > >> >> can
> > >> >> >> >>> be reprocessed by a restart of the bundle and you can get
> > >> >> duplicates.
> > >> >> >> >>>
> > >> >> >> >>> Rephrased: I think we need as a framework a batch/chunk
> > solution
> > >> >> which
> > >> >> >> >>> is reliable. I understand bundles is mapped on the runner
> and
> > not
> > >> >> >> >>> really controlled but can we get something more reliable for
> > the
> > >> >> user?
> > >> >> >> >>> Maybe we need a @BeforeBatch or something like that.
> > >> >> >> >>>
> > >> >> >> >>>>
> > >> >> >> >>>> Regards
> > >> >> >> >>>> JB
> > >> >> >> >>>>
> > >> >> >> >>>>
> > >> >> >> >>>> On 11/15/2017 09:38 AM, Romain Manni-Bucau wrote:
> > >> >> >> >>>>>
> > >> >> >> >>>>>
> > >> >> >> >>>>> Hi guys,
> > >> >> >> >>>>>
> > >> >> >> >>>>> The subject is a bit provocative but the topic is real and
> > >> coming
> > >> >> >> >>>>> again and again with the beam usage: how a dofn can handle
> > some
> > >> >> >> >>>>> "chunking".
> > >> >> >> >>>>>
> > >> >> >> >>>>> The need is to be able to commit each N records but with N
> > not
> > >> too
> > >> >> >> big.
> > >> >> >> >>>>>
> > >> >> >> >>>>> The natural API for that in beam is the bundle one but
> > bundles
> > >> are
> > >> >> >> not
> > >> >> >> >>>>> reliable since they can be very small (flink) - we can say
> > it
> > >> is
> > >> >> "ok"
> > >> >> >> >>>>> even if it has some perf impacts - or too big (spark does
> > full
> > >> >> size /
> > >> >> >> >>>>> #workers).
> > >> >> >> >>>>>
> > >> >> >> >>>>> The workaround is what we see in the ES I/O: a maxSize
> which
> > >> does
> > >> >> an
> > >> >> >> >>>>> eager flush. The issue is that then the checkpoint is not
> > >> >> respected
> > >> >> >> >>>>> and you can process multiple times the same records.
> > >> >> >> >>>>>
> > >> >> >> >>>>> Any plan to make this API reliable and controllable from a
> > beam
> > >> >> point
> > >> >> >> >>>>> of view (at least in a max manner)?
> > >> >> >> >>>>>
> > >> >> >> >>>>> Thanks,
> > >> >> >> >>>>> Romain Manni-Bucau
> > >> >> >> >>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> > >> >> >> >>>>>
> > >> >> >> >>>>
> > >> >> >> >>>> --
> > >> >> >> >>>> Jean-Baptiste Onofré
> > >> >> >> >>>> jbono...@apache.org
> > >> >> >> >>>> http://blog.nanthrax.net
> > >> >> >> >>>> Talend - http://www.talend.com
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >> >>
> > >> >> >> >
> > >> >> >> > --
> > >> >> >> > Jean-Baptiste Onofré
> > >> >> >> > jbono...@apache.org
> > >> >> >> > http://blog.nanthrax.net
> > >> >> >> > Talend - http://www.talend.com
> > >> >> >>
> > >> >>
> > >>
> >
>

Reply via email to