so I think the following will do exactly that and can be easily factored
into a reusable transform (modulo Java type boilerplate):

pCollection.apply(WithKeys.of((Element e) ->
ThreadLocalRandom.current().nextInt(N))
                  .apply(Window.into(new GlobalWindows())

.triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(100))))
                  .apply(GroupByKey.create())
                  .apply(ParDo.of(new DoFn<>() {
                      @ProcessElement
                      @StableReplay
                       public void processElement(ProcessContext c) {
                         // Insert c.element().getValue() into backend.
                       }
                   });

On Wed, Nov 15, 2017 at 7:00 PM, Romain Manni-Bucau <rmannibu...@gmail.com>
wrote:

> 2017-11-15 11:42 GMT+01:00 Reuven Lax <re...@google.com.invalid>:
> > Can we describe this at a higher level?
> >
> > I think what you want is the following. Please correct if I'm
> > misunderstanding.
> >
> > Batches of 100 elements (is this a hard requirement, or do they have to
> be
> > "approximately" 100 element?)
>
> Approximately is fine while documented (what is not is 1000000 instead
> of 10 for instance)
>
> >
> > Once you see a batch, you're guaranteed to see the same batch on retries.
>
> +1
>
> >
> > You want to then idempotently insert this batch into some backend. Things
> > may fail, workers may crash, but in that case you want to get the exact
> > same batch back so you can insert it again.
>
> +1
>
> >
> > Do you care about ordering? On failure do you have to see the same
> batches
> > in the same order as before, or is it sufficient to see the same batches?
>
> Beam doesnt everywhere so I guess it is not important - at least for
> my cases this statement is true.
>
> >
> > Reuven
> >
> > On Wed, Nov 15, 2017 at 5:58 PM, Romain Manni-Bucau <
> rmannibu...@gmail.com>
> > wrote:
> >
> >> Overall goal is to ensure each 100 elements max, a "backend" (as
> >> datastore) flush/commit/push is done and is aligned with beam
> >> checkpoints. You can see it as bringing the "general" commit-interval
> >> notion to beam and kind of get rid of the bundle notion which is
> >> almost impossible to use today.
> >>
> >> Romain Manni-Bucau
> >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >>
> >>
> >> 2017-11-15 10:27 GMT+01:00 Reuven Lax <re...@google.com.invalid>:
> >> > It's in the dev list archives, not sure if there's a doc yet.
> >> >
> >> > I'm not quite sure I understand what you mean by a "flush" Can you
> >> describe
> >> > the problem you're trying to solve?
> >> >
> >> > Reuven
> >> >
> >> > On Wed, Nov 15, 2017 at 5:25 PM, Romain Manni-Bucau <
> >> rmannibu...@gmail.com>
> >> > wrote:
> >> >
> >> >> Hmm, I didn't find the doc - if you have the link not far it would be
> >> >> appreciated - but "before" sounds not enough, it should be "after" in
> >> >> case there was a "flush" no?
> >> >>
> >> >> Romain Manni-Bucau
> >> >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >> >>
> >> >>
> >> >> 2017-11-15 10:10 GMT+01:00 Reuven Lax <re...@google.com.invalid>:
> >> >> > If you set @StableReplay before a ParDo, it forces a checkpoint
> before
> >> >> that
> >> >> > ParDo.
> >> >> >
> >> >> > On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau <
> >> >> rmannibu...@gmail.com>
> >> >> > wrote:
> >> >> >
> >> >> >> It sounds a good start. I'm not sure how a group by key (and not
> by
> >> >> >> size) can help controlling the checkpointing interval. Wonder if
> we
> >> >> >> shouldn't be able to have a CheckpointPolicy { boolean
> >> >> >> shouldCheckpoint() } used in the processing event loop. Default
> could
> >> >> >> be up to the runner but if set on the transform (or dofn) it
> would be
> >> >> >> used to control when the checkpoint is done. Thinking out loud it
> >> >> >> sounds close to jbatch checkpoint algorithm
> >> >> >> (https://docs.oracle.com/javaee/7/api/javax/batch/api/
> >> >> >> chunk/CheckpointAlgorithm.html)
> >> >> >>
> >> >> >> Romain Manni-Bucau
> >> >> >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >> >> >>
> >> >> >>
> >> >> >> 2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net>:
> >> >> >> > Yes, @StableReplay, that's the annotation. Thanks.
> >> >> >> >
> >> >> >> >
> >> >> >> > On 11/15/2017 09:52 AM, Reuven Lax wrote:
> >> >> >> >>
> >> >> >> >> Romain,
> >> >> >> >>
> >> >> >> >> I think the @StableReplay semantic that Kenn proposed a month
> or
> >> so
> >> >> ago
> >> >> >> is
> >> >> >> >> what is needed here.
> >> >> >> >>
> >> >> >> >> Essentially it will ensure that the GroupByKey iterable is
> stable
> >> and
> >> >> >> >> checkpointed. So on replay, the GroupByKey is guaranteed to
> >> receive
> >> >> the
> >> >> >> >> exact same iterable as it did before. The annotation can be put
> >> on a
> >> >> >> ParDo
> >> >> >> >> as well, in which case it ensures stability (and
> checkpointing) of
> >> >> the
> >> >> >> >> individual ParDo elements.
> >> >> >> >>
> >> >> >> >> Reuven
> >> >> >> >>
> >> >> >> >> On Wed, Nov 15, 2017 at 4:49 PM, Romain Manni-Bucau
> >> >> >> >> <rmannibu...@gmail.com>
> >> >> >> >> wrote:
> >> >> >> >>
> >> >> >> >>> 2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré <
> j...@nanthrax.net
> >> >:
> >> >> >> >>>>
> >> >> >> >>>> Hi Romain,
> >> >> >> >>>>
> >> >> >> >>>> You are right: currently, the chunking is related to bundles.
> >> >> Today,
> >> >> >> the
> >> >> >> >>>> bundle size is under the runner responsibility.
> >> >> >> >>>>
> >> >> >> >>>> I think it's fine because only the runner know an efficient
> >> bundle
> >> >> >> size.
> >> >> >> >>>
> >> >> >> >>> I'm
> >> >> >> >>>>
> >> >> >> >>>> afraid giving the "control" of the bundle size to the end
> user
> >> (via
> >> >> >> >>>> pipeline) can result to huge performances issue depending of
> the
> >> >> >> runner.
> >> >> >> >>>>
> >> >> >> >>>> It doesn't mean that we can't use an uber layer: it's what we
> >> do in
> >> >> >> >>>> ParDoWithBatch or DoFn in IO Sink where we have a batch size.
> >> >> >> >>>>
> >> >> >> >>>> Anyway, the core problem is about the checkpoint: why a
> >> checkpoint
> >> >> is
> >> >> >> >>>> not
> >> >> >> >>>> "respected" by an IO or runner ?
> >> >> >> >>>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >> >>> Take the example of a runner deciding the bundle size is 4 and
> >> the
> >> >> IO
> >> >> >> >>> deciding the commit-interval (batch semantic) is 2, what
> happens
> >> if
> >> >> >> >>> the 3rd record fails? You have pushed to the store 2 records
> >> which
> >> >> can
> >> >> >> >>> be reprocessed by a restart of the bundle and you can get
> >> >> duplicates.
> >> >> >> >>>
> >> >> >> >>> Rephrased: I think we need as a framework a batch/chunk
> solution
> >> >> which
> >> >> >> >>> is reliable. I understand bundles is mapped on the runner and
> not
> >> >> >> >>> really controlled but can we get something more reliable for
> the
> >> >> user?
> >> >> >> >>> Maybe we need a @BeforeBatch or something like that.
> >> >> >> >>>
> >> >> >> >>>>
> >> >> >> >>>> Regards
> >> >> >> >>>> JB
> >> >> >> >>>>
> >> >> >> >>>>
> >> >> >> >>>> On 11/15/2017 09:38 AM, Romain Manni-Bucau wrote:
> >> >> >> >>>>>
> >> >> >> >>>>>
> >> >> >> >>>>> Hi guys,
> >> >> >> >>>>>
> >> >> >> >>>>> The subject is a bit provocative but the topic is real and
> >> coming
> >> >> >> >>>>> again and again with the beam usage: how a dofn can handle
> some
> >> >> >> >>>>> "chunking".
> >> >> >> >>>>>
> >> >> >> >>>>> The need is to be able to commit each N records but with N
> not
> >> too
> >> >> >> big.
> >> >> >> >>>>>
> >> >> >> >>>>> The natural API for that in beam is the bundle one but
> bundles
> >> are
> >> >> >> not
> >> >> >> >>>>> reliable since they can be very small (flink) - we can say
> it
> >> is
> >> >> "ok"
> >> >> >> >>>>> even if it has some perf impacts - or too big (spark does
> full
> >> >> size /
> >> >> >> >>>>> #workers).
> >> >> >> >>>>>
> >> >> >> >>>>> The workaround is what we see in the ES I/O: a maxSize which
> >> does
> >> >> an
> >> >> >> >>>>> eager flush. The issue is that then the checkpoint is not
> >> >> respected
> >> >> >> >>>>> and you can process multiple times the same records.
> >> >> >> >>>>>
> >> >> >> >>>>> Any plan to make this API reliable and controllable from a
> beam
> >> >> point
> >> >> >> >>>>> of view (at least in a max manner)?
> >> >> >> >>>>>
> >> >> >> >>>>> Thanks,
> >> >> >> >>>>> Romain Manni-Bucau
> >> >> >> >>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >> >> >> >>>>>
> >> >> >> >>>>
> >> >> >> >>>> --
> >> >> >> >>>> Jean-Baptiste Onofré
> >> >> >> >>>> jbono...@apache.org
> >> >> >> >>>> http://blog.nanthrax.net
> >> >> >> >>>> Talend - http://www.talend.com
> >> >> >> >>>
> >> >> >> >>>
> >> >> >> >>
> >> >> >> >
> >> >> >> > --
> >> >> >> > Jean-Baptiste Onofré
> >> >> >> > jbono...@apache.org
> >> >> >> > http://blog.nanthrax.net
> >> >> >> > Talend - http://www.talend.com
> >> >> >>
> >> >>
> >>
>

Reply via email to