In case the connection is not clear to folks on this thread, I pinged the thread on @StableReplay / @RequiresStableInput / etc and opened a draft PR at https://github.com/apache/beam/pull/4135.
On Wed, Nov 15, 2017 at 3:24 AM, Reuven Lax <[email protected]> wrote: > so I think the following will do exactly that and can be easily factored > into a reusable transform (modulo Java type boilerplate): > > pCollection.apply(WithKeys.of((Element e) -> > ThreadLocalRandom.current().nextInt(N)) > .apply(Window.into(new GlobalWindows()) > > .triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane. > elementCountAtLeast(100)))) > .apply(GroupByKey.create()) > .apply(ParDo.of(new DoFn<>() { > @ProcessElement > @StableReplay > public void processElement(ProcessContext c) { > // Insert c.element().getValue() into backend. > } > }); > > On Wed, Nov 15, 2017 at 7:00 PM, Romain Manni-Bucau <[email protected] > > > wrote: > > > 2017-11-15 11:42 GMT+01:00 Reuven Lax <[email protected]>: > > > Can we describe this at a higher level? > > > > > > I think what you want is the following. Please correct if I'm > > > misunderstanding. > > > > > > Batches of 100 elements (is this a hard requirement, or do they have to > > be > > > "approximately" 100 element?) > > > > Approximately is fine while documented (what is not is 1000000 instead > > of 10 for instance) > > > > > > > > Once you see a batch, you're guaranteed to see the same batch on > retries. > > > > +1 > > > > > > > > You want to then idempotently insert this batch into some backend. > Things > > > may fail, workers may crash, but in that case you want to get the exact > > > same batch back so you can insert it again. > > > > +1 > > > > > > > > Do you care about ordering? On failure do you have to see the same > > batches > > > in the same order as before, or is it sufficient to see the same > batches? > > > > Beam doesnt everywhere so I guess it is not important - at least for > > my cases this statement is true. > > > > > > > > Reuven > > > > > > On Wed, Nov 15, 2017 at 5:58 PM, Romain Manni-Bucau < > > [email protected]> > > > wrote: > > > > > >> Overall goal is to ensure each 100 elements max, a "backend" (as > > >> datastore) flush/commit/push is done and is aligned with beam > > >> checkpoints. You can see it as bringing the "general" commit-interval > > >> notion to beam and kind of get rid of the bundle notion which is > > >> almost impossible to use today. > > >> > > >> Romain Manni-Bucau > > >> @rmannibucau | Blog | Old Blog | Github | LinkedIn > > >> > > >> > > >> 2017-11-15 10:27 GMT+01:00 Reuven Lax <[email protected]>: > > >> > It's in the dev list archives, not sure if there's a doc yet. > > >> > > > >> > I'm not quite sure I understand what you mean by a "flush" Can you > > >> describe > > >> > the problem you're trying to solve? > > >> > > > >> > Reuven > > >> > > > >> > On Wed, Nov 15, 2017 at 5:25 PM, Romain Manni-Bucau < > > >> [email protected]> > > >> > wrote: > > >> > > > >> >> Hmm, I didn't find the doc - if you have the link not far it would > be > > >> >> appreciated - but "before" sounds not enough, it should be "after" > in > > >> >> case there was a "flush" no? > > >> >> > > >> >> Romain Manni-Bucau > > >> >> @rmannibucau | Blog | Old Blog | Github | LinkedIn > > >> >> > > >> >> > > >> >> 2017-11-15 10:10 GMT+01:00 Reuven Lax <[email protected]>: > > >> >> > If you set @StableReplay before a ParDo, it forces a checkpoint > > before > > >> >> that > > >> >> > ParDo. > > >> >> > > > >> >> > On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau < > > >> >> [email protected]> > > >> >> > wrote: > > >> >> > > > >> >> >> It sounds a good start. I'm not sure how a group by key (and not > > by > > >> >> >> size) can help controlling the checkpointing interval. Wonder if > > we > > >> >> >> shouldn't be able to have a CheckpointPolicy { boolean > > >> >> >> shouldCheckpoint() } used in the processing event loop. Default > > could > > >> >> >> be up to the runner but if set on the transform (or dofn) it > > would be > > >> >> >> used to control when the checkpoint is done. Thinking out loud > it > > >> >> >> sounds close to jbatch checkpoint algorithm > > >> >> >> (https://docs.oracle.com/javaee/7/api/javax/batch/api/ > > >> >> >> chunk/CheckpointAlgorithm.html) > > >> >> >> > > >> >> >> Romain Manni-Bucau > > >> >> >> @rmannibucau | Blog | Old Blog | Github | LinkedIn > > >> >> >> > > >> >> >> > > >> >> >> 2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré <[email protected] > >: > > >> >> >> > Yes, @StableReplay, that's the annotation. Thanks. > > >> >> >> > > > >> >> >> > > > >> >> >> > On 11/15/2017 09:52 AM, Reuven Lax wrote: > > >> >> >> >> > > >> >> >> >> Romain, > > >> >> >> >> > > >> >> >> >> I think the @StableReplay semantic that Kenn proposed a month > > or > > >> so > > >> >> ago > > >> >> >> is > > >> >> >> >> what is needed here. > > >> >> >> >> > > >> >> >> >> Essentially it will ensure that the GroupByKey iterable is > > stable > > >> and > > >> >> >> >> checkpointed. So on replay, the GroupByKey is guaranteed to > > >> receive > > >> >> the > > >> >> >> >> exact same iterable as it did before. The annotation can be > put > > >> on a > > >> >> >> ParDo > > >> >> >> >> as well, in which case it ensures stability (and > > checkpointing) of > > >> >> the > > >> >> >> >> individual ParDo elements. > > >> >> >> >> > > >> >> >> >> Reuven > > >> >> >> >> > > >> >> >> >> On Wed, Nov 15, 2017 at 4:49 PM, Romain Manni-Bucau > > >> >> >> >> <[email protected]> > > >> >> >> >> wrote: > > >> >> >> >> > > >> >> >> >>> 2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré < > > [email protected] > > >> >: > > >> >> >> >>>> > > >> >> >> >>>> Hi Romain, > > >> >> >> >>>> > > >> >> >> >>>> You are right: currently, the chunking is related to > bundles. > > >> >> Today, > > >> >> >> the > > >> >> >> >>>> bundle size is under the runner responsibility. > > >> >> >> >>>> > > >> >> >> >>>> I think it's fine because only the runner know an efficient > > >> bundle > > >> >> >> size. > > >> >> >> >>> > > >> >> >> >>> I'm > > >> >> >> >>>> > > >> >> >> >>>> afraid giving the "control" of the bundle size to the end > > user > > >> (via > > >> >> >> >>>> pipeline) can result to huge performances issue depending > of > > the > > >> >> >> runner. > > >> >> >> >>>> > > >> >> >> >>>> It doesn't mean that we can't use an uber layer: it's what > we > > >> do in > > >> >> >> >>>> ParDoWithBatch or DoFn in IO Sink where we have a batch > size. > > >> >> >> >>>> > > >> >> >> >>>> Anyway, the core problem is about the checkpoint: why a > > >> checkpoint > > >> >> is > > >> >> >> >>>> not > > >> >> >> >>>> "respected" by an IO or runner ? > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> >>> Take the example of a runner deciding the bundle size is 4 > and > > >> the > > >> >> IO > > >> >> >> >>> deciding the commit-interval (batch semantic) is 2, what > > happens > > >> if > > >> >> >> >>> the 3rd record fails? You have pushed to the store 2 records > > >> which > > >> >> can > > >> >> >> >>> be reprocessed by a restart of the bundle and you can get > > >> >> duplicates. > > >> >> >> >>> > > >> >> >> >>> Rephrased: I think we need as a framework a batch/chunk > > solution > > >> >> which > > >> >> >> >>> is reliable. I understand bundles is mapped on the runner > and > > not > > >> >> >> >>> really controlled but can we get something more reliable for > > the > > >> >> user? > > >> >> >> >>> Maybe we need a @BeforeBatch or something like that. > > >> >> >> >>> > > >> >> >> >>>> > > >> >> >> >>>> Regards > > >> >> >> >>>> JB > > >> >> >> >>>> > > >> >> >> >>>> > > >> >> >> >>>> On 11/15/2017 09:38 AM, Romain Manni-Bucau wrote: > > >> >> >> >>>>> > > >> >> >> >>>>> > > >> >> >> >>>>> Hi guys, > > >> >> >> >>>>> > > >> >> >> >>>>> The subject is a bit provocative but the topic is real and > > >> coming > > >> >> >> >>>>> again and again with the beam usage: how a dofn can handle > > some > > >> >> >> >>>>> "chunking". > > >> >> >> >>>>> > > >> >> >> >>>>> The need is to be able to commit each N records but with N > > not > > >> too > > >> >> >> big. > > >> >> >> >>>>> > > >> >> >> >>>>> The natural API for that in beam is the bundle one but > > bundles > > >> are > > >> >> >> not > > >> >> >> >>>>> reliable since they can be very small (flink) - we can say > > it > > >> is > > >> >> "ok" > > >> >> >> >>>>> even if it has some perf impacts - or too big (spark does > > full > > >> >> size / > > >> >> >> >>>>> #workers). > > >> >> >> >>>>> > > >> >> >> >>>>> The workaround is what we see in the ES I/O: a maxSize > which > > >> does > > >> >> an > > >> >> >> >>>>> eager flush. The issue is that then the checkpoint is not > > >> >> respected > > >> >> >> >>>>> and you can process multiple times the same records. > > >> >> >> >>>>> > > >> >> >> >>>>> Any plan to make this API reliable and controllable from a > > beam > > >> >> point > > >> >> >> >>>>> of view (at least in a max manner)? > > >> >> >> >>>>> > > >> >> >> >>>>> Thanks, > > >> >> >> >>>>> Romain Manni-Bucau > > >> >> >> >>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn > > >> >> >> >>>>> > > >> >> >> >>>> > > >> >> >> >>>> -- > > >> >> >> >>>> Jean-Baptiste Onofré > > >> >> >> >>>> [email protected] > > >> >> >> >>>> http://blog.nanthrax.net > > >> >> >> >>>> Talend - http://www.talend.com > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> >> > > >> >> >> > > > >> >> >> > -- > > >> >> >> > Jean-Baptiste Onofré > > >> >> >> > [email protected] > > >> >> >> > http://blog.nanthrax.net > > >> >> >> > Talend - http://www.talend.com > > >> >> >> > > >> >> > > >> > > >
