On Wed, Nov 15, 2017 at 9:16 PM, Romain Manni-Bucau <rmannibu...@gmail.com>
wrote:

> @Reuven: it looks like a good workaround
> @Ken: thks a lot for the link!
>
> @all:
>
> 1. do you think it is doable without windowing usage (to have
> something more reliable in term of runner since it will depend on less
> primitives?
>

This depends on trigggering, not on windowing. Triggering is a pretty core
component of the model - no unbounded inputs can be processed at all
without trigggering. "Checkpointing" is a harder thing to pin down, as it
means different things to different runners (e.g. "checkpointing" in Flink
means something very different than in Datafalow and different than in
Spark).



> 2. what about allowing the user to define when to checkpoint?
>

As I mentioned, "checkpoint" is sometimes an ill-defined operation,
especially across different runners . Instead I think it's better to have
an annotation that defines the semantics you want (e.g. stable replay), and
let the runner decide how to implement it (possibly by checkpointing).

3. can we get this kind of "composite" pattern in the beam core?
>

I don't see why not. Though we first need to get @StableReplay implemented.


>
>
>
> Romain Manni-Bucau
> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>
>
> 2017-11-15 14:12 GMT+01:00 Kenneth Knowles <k...@google.com.invalid>:
> > In case the connection is not clear to folks on this thread, I pinged the
> > thread on @StableReplay / @RequiresStableInput / etc and opened a draft
> PR
> > at https://github.com/apache/beam/pull/4135.
> >
> > On Wed, Nov 15, 2017 at 3:24 AM, Reuven Lax <re...@google.com.invalid>
> > wrote:
> >
> >> so I think the following will do exactly that and can be easily factored
> >> into a reusable transform (modulo Java type boilerplate):
> >>
> >> pCollection.apply(WithKeys.of((Element e) ->
> >> ThreadLocalRandom.current().nextInt(N))
> >>                   .apply(Window.into(new GlobalWindows())
> >>
> >> .triggering(AfterWatermark.pastEndOfWindow().
> withEarlyFirings(AfterPane.
> >> elementCountAtLeast(100))))
> >>                   .apply(GroupByKey.create())
> >>                   .apply(ParDo.of(new DoFn<>() {
> >>                       @ProcessElement
> >>                       @StableReplay
> >>                        public void processElement(ProcessContext c) {
> >>                          // Insert c.element().getValue() into backend.
> >>                        }
> >>                    });
> >>
> >> On Wed, Nov 15, 2017 at 7:00 PM, Romain Manni-Bucau <
> rmannibu...@gmail.com
> >> >
> >> wrote:
> >>
> >> > 2017-11-15 11:42 GMT+01:00 Reuven Lax <re...@google.com.invalid>:
> >> > > Can we describe this at a higher level?
> >> > >
> >> > > I think what you want is the following. Please correct if I'm
> >> > > misunderstanding.
> >> > >
> >> > > Batches of 100 elements (is this a hard requirement, or do they
> have to
> >> > be
> >> > > "approximately" 100 element?)
> >> >
> >> > Approximately is fine while documented (what is not is 1000000 instead
> >> > of 10 for instance)
> >> >
> >> > >
> >> > > Once you see a batch, you're guaranteed to see the same batch on
> >> retries.
> >> >
> >> > +1
> >> >
> >> > >
> >> > > You want to then idempotently insert this batch into some backend.
> >> Things
> >> > > may fail, workers may crash, but in that case you want to get the
> exact
> >> > > same batch back so you can insert it again.
> >> >
> >> > +1
> >> >
> >> > >
> >> > > Do you care about ordering? On failure do you have to see the same
> >> > batches
> >> > > in the same order as before, or is it sufficient to see the same
> >> batches?
> >> >
> >> > Beam doesnt everywhere so I guess it is not important - at least for
> >> > my cases this statement is true.
> >> >
> >> > >
> >> > > Reuven
> >> > >
> >> > > On Wed, Nov 15, 2017 at 5:58 PM, Romain Manni-Bucau <
> >> > rmannibu...@gmail.com>
> >> > > wrote:
> >> > >
> >> > >> Overall goal is to ensure each 100 elements max, a "backend" (as
> >> > >> datastore) flush/commit/push is done and is aligned with beam
> >> > >> checkpoints. You can see it as bringing the "general"
> commit-interval
> >> > >> notion to beam and kind of get rid of the bundle notion which is
> >> > >> almost impossible to use today.
> >> > >>
> >> > >> Romain Manni-Bucau
> >> > >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >> > >>
> >> > >>
> >> > >> 2017-11-15 10:27 GMT+01:00 Reuven Lax <re...@google.com.invalid>:
> >> > >> > It's in the dev list archives, not sure if there's a doc yet.
> >> > >> >
> >> > >> > I'm not quite sure I understand what you mean by a "flush" Can
> you
> >> > >> describe
> >> > >> > the problem you're trying to solve?
> >> > >> >
> >> > >> > Reuven
> >> > >> >
> >> > >> > On Wed, Nov 15, 2017 at 5:25 PM, Romain Manni-Bucau <
> >> > >> rmannibu...@gmail.com>
> >> > >> > wrote:
> >> > >> >
> >> > >> >> Hmm, I didn't find the doc - if you have the link not far it
> would
> >> be
> >> > >> >> appreciated - but "before" sounds not enough, it should be
> "after"
> >> in
> >> > >> >> case there was a "flush" no?
> >> > >> >>
> >> > >> >> Romain Manni-Bucau
> >> > >> >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >> > >> >>
> >> > >> >>
> >> > >> >> 2017-11-15 10:10 GMT+01:00 Reuven Lax <re...@google.com.invalid
> >:
> >> > >> >> > If you set @StableReplay before a ParDo, it forces a
> checkpoint
> >> > before
> >> > >> >> that
> >> > >> >> > ParDo.
> >> > >> >> >
> >> > >> >> > On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau <
> >> > >> >> rmannibu...@gmail.com>
> >> > >> >> > wrote:
> >> > >> >> >
> >> > >> >> >> It sounds a good start. I'm not sure how a group by key (and
> not
> >> > by
> >> > >> >> >> size) can help controlling the checkpointing interval.
> Wonder if
> >> > we
> >> > >> >> >> shouldn't be able to have a CheckpointPolicy { boolean
> >> > >> >> >> shouldCheckpoint() } used in the processing event loop.
> Default
> >> > could
> >> > >> >> >> be up to the runner but if set on the transform (or dofn) it
> >> > would be
> >> > >> >> >> used to control when the checkpoint is done. Thinking out
> loud
> >> it
> >> > >> >> >> sounds close to jbatch checkpoint algorithm
> >> > >> >> >> (https://docs.oracle.com/javaee/7/api/javax/batch/api/
> >> > >> >> >> chunk/CheckpointAlgorithm.html)
> >> > >> >> >>
> >> > >> >> >> Romain Manni-Bucau
> >> > >> >> >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >> > >> >> >>
> >> > >> >> >>
> >> > >> >> >> 2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré <
> j...@nanthrax.net
> >> >:
> >> > >> >> >> > Yes, @StableReplay, that's the annotation. Thanks.
> >> > >> >> >> >
> >> > >> >> >> >
> >> > >> >> >> > On 11/15/2017 09:52 AM, Reuven Lax wrote:
> >> > >> >> >> >>
> >> > >> >> >> >> Romain,
> >> > >> >> >> >>
> >> > >> >> >> >> I think the @StableReplay semantic that Kenn proposed a
> month
> >> > or
> >> > >> so
> >> > >> >> ago
> >> > >> >> >> is
> >> > >> >> >> >> what is needed here.
> >> > >> >> >> >>
> >> > >> >> >> >> Essentially it will ensure that the GroupByKey iterable is
> >> > stable
> >> > >> and
> >> > >> >> >> >> checkpointed. So on replay, the GroupByKey is guaranteed
> to
> >> > >> receive
> >> > >> >> the
> >> > >> >> >> >> exact same iterable as it did before. The annotation can
> be
> >> put
> >> > >> on a
> >> > >> >> >> ParDo
> >> > >> >> >> >> as well, in which case it ensures stability (and
> >> > checkpointing) of
> >> > >> >> the
> >> > >> >> >> >> individual ParDo elements.
> >> > >> >> >> >>
> >> > >> >> >> >> Reuven
> >> > >> >> >> >>
> >> > >> >> >> >> On Wed, Nov 15, 2017 at 4:49 PM, Romain Manni-Bucau
> >> > >> >> >> >> <rmannibu...@gmail.com>
> >> > >> >> >> >> wrote:
> >> > >> >> >> >>
> >> > >> >> >> >>> 2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré <
> >> > j...@nanthrax.net
> >> > >> >:
> >> > >> >> >> >>>>
> >> > >> >> >> >>>> Hi Romain,
> >> > >> >> >> >>>>
> >> > >> >> >> >>>> You are right: currently, the chunking is related to
> >> bundles.
> >> > >> >> Today,
> >> > >> >> >> the
> >> > >> >> >> >>>> bundle size is under the runner responsibility.
> >> > >> >> >> >>>>
> >> > >> >> >> >>>> I think it's fine because only the runner know an
> efficient
> >> > >> bundle
> >> > >> >> >> size.
> >> > >> >> >> >>>
> >> > >> >> >> >>> I'm
> >> > >> >> >> >>>>
> >> > >> >> >> >>>> afraid giving the "control" of the bundle size to the
> end
> >> > user
> >> > >> (via
> >> > >> >> >> >>>> pipeline) can result to huge performances issue
> depending
> >> of
> >> > the
> >> > >> >> >> runner.
> >> > >> >> >> >>>>
> >> > >> >> >> >>>> It doesn't mean that we can't use an uber layer: it's
> what
> >> we
> >> > >> do in
> >> > >> >> >> >>>> ParDoWithBatch or DoFn in IO Sink where we have a batch
> >> size.
> >> > >> >> >> >>>>
> >> > >> >> >> >>>> Anyway, the core problem is about the checkpoint: why a
> >> > >> checkpoint
> >> > >> >> is
> >> > >> >> >> >>>> not
> >> > >> >> >> >>>> "respected" by an IO or runner ?
> >> > >> >> >> >>>
> >> > >> >> >> >>>
> >> > >> >> >> >>>
> >> > >> >> >> >>> Take the example of a runner deciding the bundle size is
> 4
> >> and
> >> > >> the
> >> > >> >> IO
> >> > >> >> >> >>> deciding the commit-interval (batch semantic) is 2, what
> >> > happens
> >> > >> if
> >> > >> >> >> >>> the 3rd record fails? You have pushed to the store 2
> records
> >> > >> which
> >> > >> >> can
> >> > >> >> >> >>> be reprocessed by a restart of the bundle and you can get
> >> > >> >> duplicates.
> >> > >> >> >> >>>
> >> > >> >> >> >>> Rephrased: I think we need as a framework a batch/chunk
> >> > solution
> >> > >> >> which
> >> > >> >> >> >>> is reliable. I understand bundles is mapped on the runner
> >> and
> >> > not
> >> > >> >> >> >>> really controlled but can we get something more reliable
> for
> >> > the
> >> > >> >> user?
> >> > >> >> >> >>> Maybe we need a @BeforeBatch or something like that.
> >> > >> >> >> >>>
> >> > >> >> >> >>>>
> >> > >> >> >> >>>> Regards
> >> > >> >> >> >>>> JB
> >> > >> >> >> >>>>
> >> > >> >> >> >>>>
> >> > >> >> >> >>>> On 11/15/2017 09:38 AM, Romain Manni-Bucau wrote:
> >> > >> >> >> >>>>>
> >> > >> >> >> >>>>>
> >> > >> >> >> >>>>> Hi guys,
> >> > >> >> >> >>>>>
> >> > >> >> >> >>>>> The subject is a bit provocative but the topic is real
> and
> >> > >> coming
> >> > >> >> >> >>>>> again and again with the beam usage: how a dofn can
> handle
> >> > some
> >> > >> >> >> >>>>> "chunking".
> >> > >> >> >> >>>>>
> >> > >> >> >> >>>>> The need is to be able to commit each N records but
> with N
> >> > not
> >> > >> too
> >> > >> >> >> big.
> >> > >> >> >> >>>>>
> >> > >> >> >> >>>>> The natural API for that in beam is the bundle one but
> >> > bundles
> >> > >> are
> >> > >> >> >> not
> >> > >> >> >> >>>>> reliable since they can be very small (flink) - we can
> say
> >> > it
> >> > >> is
> >> > >> >> "ok"
> >> > >> >> >> >>>>> even if it has some perf impacts - or too big (spark
> does
> >> > full
> >> > >> >> size /
> >> > >> >> >> >>>>> #workers).
> >> > >> >> >> >>>>>
> >> > >> >> >> >>>>> The workaround is what we see in the ES I/O: a maxSize
> >> which
> >> > >> does
> >> > >> >> an
> >> > >> >> >> >>>>> eager flush. The issue is that then the checkpoint is
> not
> >> > >> >> respected
> >> > >> >> >> >>>>> and you can process multiple times the same records.
> >> > >> >> >> >>>>>
> >> > >> >> >> >>>>> Any plan to make this API reliable and controllable
> from a
> >> > beam
> >> > >> >> point
> >> > >> >> >> >>>>> of view (at least in a max manner)?
> >> > >> >> >> >>>>>
> >> > >> >> >> >>>>> Thanks,
> >> > >> >> >> >>>>> Romain Manni-Bucau
> >> > >> >> >> >>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >> > >> >> >> >>>>>
> >> > >> >> >> >>>>
> >> > >> >> >> >>>> --
> >> > >> >> >> >>>> Jean-Baptiste Onofré
> >> > >> >> >> >>>> jbono...@apache.org
> >> > >> >> >> >>>> http://blog.nanthrax.net
> >> > >> >> >> >>>> Talend - http://www.talend.com
> >> > >> >> >> >>>
> >> > >> >> >> >>>
> >> > >> >> >> >>
> >> > >> >> >> >
> >> > >> >> >> > --
> >> > >> >> >> > Jean-Baptiste Onofré
> >> > >> >> >> > jbono...@apache.org
> >> > >> >> >> > http://blog.nanthrax.net
> >> > >> >> >> > Talend - http://www.talend.com
> >> > >> >> >>
> >> > >> >>
> >> > >>
> >> >
> >>
>

Reply via email to