2017-11-15 11:42 GMT+01:00 Reuven Lax <re...@google.com.invalid>: > Can we describe this at a higher level? > > I think what you want is the following. Please correct if I'm > misunderstanding. > > Batches of 100 elements (is this a hard requirement, or do they have to be > "approximately" 100 element?)
Approximately is fine while documented (what is not is 1000000 instead of 10 for instance) > > Once you see a batch, you're guaranteed to see the same batch on retries. +1 > > You want to then idempotently insert this batch into some backend. Things > may fail, workers may crash, but in that case you want to get the exact > same batch back so you can insert it again. +1 > > Do you care about ordering? On failure do you have to see the same batches > in the same order as before, or is it sufficient to see the same batches? Beam doesnt everywhere so I guess it is not important - at least for my cases this statement is true. > > Reuven > > On Wed, Nov 15, 2017 at 5:58 PM, Romain Manni-Bucau <rmannibu...@gmail.com> > wrote: > >> Overall goal is to ensure each 100 elements max, a "backend" (as >> datastore) flush/commit/push is done and is aligned with beam >> checkpoints. You can see it as bringing the "general" commit-interval >> notion to beam and kind of get rid of the bundle notion which is >> almost impossible to use today. >> >> Romain Manni-Bucau >> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >> >> 2017-11-15 10:27 GMT+01:00 Reuven Lax <re...@google.com.invalid>: >> > It's in the dev list archives, not sure if there's a doc yet. >> > >> > I'm not quite sure I understand what you mean by a "flush" Can you >> describe >> > the problem you're trying to solve? >> > >> > Reuven >> > >> > On Wed, Nov 15, 2017 at 5:25 PM, Romain Manni-Bucau < >> rmannibu...@gmail.com> >> > wrote: >> > >> >> Hmm, I didn't find the doc - if you have the link not far it would be >> >> appreciated - but "before" sounds not enough, it should be "after" in >> >> case there was a "flush" no? >> >> >> >> Romain Manni-Bucau >> >> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >> >> >> >> >> 2017-11-15 10:10 GMT+01:00 Reuven Lax <re...@google.com.invalid>: >> >> > If you set @StableReplay before a ParDo, it forces a checkpoint before >> >> that >> >> > ParDo. >> >> > >> >> > On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau < >> >> rmannibu...@gmail.com> >> >> > wrote: >> >> > >> >> >> It sounds a good start. I'm not sure how a group by key (and not by >> >> >> size) can help controlling the checkpointing interval. Wonder if we >> >> >> shouldn't be able to have a CheckpointPolicy { boolean >> >> >> shouldCheckpoint() } used in the processing event loop. Default could >> >> >> be up to the runner but if set on the transform (or dofn) it would be >> >> >> used to control when the checkpoint is done. Thinking out loud it >> >> >> sounds close to jbatch checkpoint algorithm >> >> >> (https://docs.oracle.com/javaee/7/api/javax/batch/api/ >> >> >> chunk/CheckpointAlgorithm.html) >> >> >> >> >> >> Romain Manni-Bucau >> >> >> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >> >> >> >> >> >> >> >> 2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net>: >> >> >> > Yes, @StableReplay, that's the annotation. Thanks. >> >> >> > >> >> >> > >> >> >> > On 11/15/2017 09:52 AM, Reuven Lax wrote: >> >> >> >> >> >> >> >> Romain, >> >> >> >> >> >> >> >> I think the @StableReplay semantic that Kenn proposed a month or >> so >> >> ago >> >> >> is >> >> >> >> what is needed here. >> >> >> >> >> >> >> >> Essentially it will ensure that the GroupByKey iterable is stable >> and >> >> >> >> checkpointed. So on replay, the GroupByKey is guaranteed to >> receive >> >> the >> >> >> >> exact same iterable as it did before. The annotation can be put >> on a >> >> >> ParDo >> >> >> >> as well, in which case it ensures stability (and checkpointing) of >> >> the >> >> >> >> individual ParDo elements. >> >> >> >> >> >> >> >> Reuven >> >> >> >> >> >> >> >> On Wed, Nov 15, 2017 at 4:49 PM, Romain Manni-Bucau >> >> >> >> <rmannibu...@gmail.com> >> >> >> >> wrote: >> >> >> >> >> >> >> >>> 2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net >> >: >> >> >> >>>> >> >> >> >>>> Hi Romain, >> >> >> >>>> >> >> >> >>>> You are right: currently, the chunking is related to bundles. >> >> Today, >> >> >> the >> >> >> >>>> bundle size is under the runner responsibility. >> >> >> >>>> >> >> >> >>>> I think it's fine because only the runner know an efficient >> bundle >> >> >> size. >> >> >> >>> >> >> >> >>> I'm >> >> >> >>>> >> >> >> >>>> afraid giving the "control" of the bundle size to the end user >> (via >> >> >> >>>> pipeline) can result to huge performances issue depending of the >> >> >> runner. >> >> >> >>>> >> >> >> >>>> It doesn't mean that we can't use an uber layer: it's what we >> do in >> >> >> >>>> ParDoWithBatch or DoFn in IO Sink where we have a batch size. >> >> >> >>>> >> >> >> >>>> Anyway, the core problem is about the checkpoint: why a >> checkpoint >> >> is >> >> >> >>>> not >> >> >> >>>> "respected" by an IO or runner ? >> >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >> >>> Take the example of a runner deciding the bundle size is 4 and >> the >> >> IO >> >> >> >>> deciding the commit-interval (batch semantic) is 2, what happens >> if >> >> >> >>> the 3rd record fails? You have pushed to the store 2 records >> which >> >> can >> >> >> >>> be reprocessed by a restart of the bundle and you can get >> >> duplicates. >> >> >> >>> >> >> >> >>> Rephrased: I think we need as a framework a batch/chunk solution >> >> which >> >> >> >>> is reliable. I understand bundles is mapped on the runner and not >> >> >> >>> really controlled but can we get something more reliable for the >> >> user? >> >> >> >>> Maybe we need a @BeforeBatch or something like that. >> >> >> >>> >> >> >> >>>> >> >> >> >>>> Regards >> >> >> >>>> JB >> >> >> >>>> >> >> >> >>>> >> >> >> >>>> On 11/15/2017 09:38 AM, Romain Manni-Bucau wrote: >> >> >> >>>>> >> >> >> >>>>> >> >> >> >>>>> Hi guys, >> >> >> >>>>> >> >> >> >>>>> The subject is a bit provocative but the topic is real and >> coming >> >> >> >>>>> again and again with the beam usage: how a dofn can handle some >> >> >> >>>>> "chunking". >> >> >> >>>>> >> >> >> >>>>> The need is to be able to commit each N records but with N not >> too >> >> >> big. >> >> >> >>>>> >> >> >> >>>>> The natural API for that in beam is the bundle one but bundles >> are >> >> >> not >> >> >> >>>>> reliable since they can be very small (flink) - we can say it >> is >> >> "ok" >> >> >> >>>>> even if it has some perf impacts - or too big (spark does full >> >> size / >> >> >> >>>>> #workers). >> >> >> >>>>> >> >> >> >>>>> The workaround is what we see in the ES I/O: a maxSize which >> does >> >> an >> >> >> >>>>> eager flush. The issue is that then the checkpoint is not >> >> respected >> >> >> >>>>> and you can process multiple times the same records. >> >> >> >>>>> >> >> >> >>>>> Any plan to make this API reliable and controllable from a beam >> >> point >> >> >> >>>>> of view (at least in a max manner)? >> >> >> >>>>> >> >> >> >>>>> Thanks, >> >> >> >>>>> Romain Manni-Bucau >> >> >> >>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >> >> >>>>> >> >> >> >>>> >> >> >> >>>> -- >> >> >> >>>> Jean-Baptiste Onofré >> >> >> >>>> jbono...@apache.org >> >> >> >>>> http://blog.nanthrax.net >> >> >> >>>> Talend - http://www.talend.com >> >> >> >>> >> >> >> >>> >> >> >> >> >> >> >> > >> >> >> > -- >> >> >> > Jean-Baptiste Onofré >> >> >> > jbono...@apache.org >> >> >> > http://blog.nanthrax.net >> >> >> > Talend - http://www.talend.com >> >> >> >> >> >>