@Reuven: it looks like a good workaround @Ken: thks a lot for the link! @all:
1. do you think it is doable without windowing usage (to have something more reliable in term of runner since it will depend on less primitives? 2. what about allowing the user to define when to checkpoint? 3. can we get this kind of "composite" pattern in the beam core? Romain Manni-Bucau @rmannibucau | Blog | Old Blog | Github | LinkedIn 2017-11-15 14:12 GMT+01:00 Kenneth Knowles <k...@google.com.invalid>: > In case the connection is not clear to folks on this thread, I pinged the > thread on @StableReplay / @RequiresStableInput / etc and opened a draft PR > at https://github.com/apache/beam/pull/4135. > > On Wed, Nov 15, 2017 at 3:24 AM, Reuven Lax <re...@google.com.invalid> > wrote: > >> so I think the following will do exactly that and can be easily factored >> into a reusable transform (modulo Java type boilerplate): >> >> pCollection.apply(WithKeys.of((Element e) -> >> ThreadLocalRandom.current().nextInt(N)) >> .apply(Window.into(new GlobalWindows()) >> >> .triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane. >> elementCountAtLeast(100)))) >> .apply(GroupByKey.create()) >> .apply(ParDo.of(new DoFn<>() { >> @ProcessElement >> @StableReplay >> public void processElement(ProcessContext c) { >> // Insert c.element().getValue() into backend. >> } >> }); >> >> On Wed, Nov 15, 2017 at 7:00 PM, Romain Manni-Bucau <rmannibu...@gmail.com >> > >> wrote: >> >> > 2017-11-15 11:42 GMT+01:00 Reuven Lax <re...@google.com.invalid>: >> > > Can we describe this at a higher level? >> > > >> > > I think what you want is the following. Please correct if I'm >> > > misunderstanding. >> > > >> > > Batches of 100 elements (is this a hard requirement, or do they have to >> > be >> > > "approximately" 100 element?) >> > >> > Approximately is fine while documented (what is not is 1000000 instead >> > of 10 for instance) >> > >> > > >> > > Once you see a batch, you're guaranteed to see the same batch on >> retries. >> > >> > +1 >> > >> > > >> > > You want to then idempotently insert this batch into some backend. >> Things >> > > may fail, workers may crash, but in that case you want to get the exact >> > > same batch back so you can insert it again. >> > >> > +1 >> > >> > > >> > > Do you care about ordering? On failure do you have to see the same >> > batches >> > > in the same order as before, or is it sufficient to see the same >> batches? >> > >> > Beam doesnt everywhere so I guess it is not important - at least for >> > my cases this statement is true. >> > >> > > >> > > Reuven >> > > >> > > On Wed, Nov 15, 2017 at 5:58 PM, Romain Manni-Bucau < >> > rmannibu...@gmail.com> >> > > wrote: >> > > >> > >> Overall goal is to ensure each 100 elements max, a "backend" (as >> > >> datastore) flush/commit/push is done and is aligned with beam >> > >> checkpoints. You can see it as bringing the "general" commit-interval >> > >> notion to beam and kind of get rid of the bundle notion which is >> > >> almost impossible to use today. >> > >> >> > >> Romain Manni-Bucau >> > >> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> > >> >> > >> >> > >> 2017-11-15 10:27 GMT+01:00 Reuven Lax <re...@google.com.invalid>: >> > >> > It's in the dev list archives, not sure if there's a doc yet. >> > >> > >> > >> > I'm not quite sure I understand what you mean by a "flush" Can you >> > >> describe >> > >> > the problem you're trying to solve? >> > >> > >> > >> > Reuven >> > >> > >> > >> > On Wed, Nov 15, 2017 at 5:25 PM, Romain Manni-Bucau < >> > >> rmannibu...@gmail.com> >> > >> > wrote: >> > >> > >> > >> >> Hmm, I didn't find the doc - if you have the link not far it would >> be >> > >> >> appreciated - but "before" sounds not enough, it should be "after" >> in >> > >> >> case there was a "flush" no? >> > >> >> >> > >> >> Romain Manni-Bucau >> > >> >> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> > >> >> >> > >> >> >> > >> >> 2017-11-15 10:10 GMT+01:00 Reuven Lax <re...@google.com.invalid>: >> > >> >> > If you set @StableReplay before a ParDo, it forces a checkpoint >> > before >> > >> >> that >> > >> >> > ParDo. >> > >> >> > >> > >> >> > On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau < >> > >> >> rmannibu...@gmail.com> >> > >> >> > wrote: >> > >> >> > >> > >> >> >> It sounds a good start. I'm not sure how a group by key (and not >> > by >> > >> >> >> size) can help controlling the checkpointing interval. Wonder if >> > we >> > >> >> >> shouldn't be able to have a CheckpointPolicy { boolean >> > >> >> >> shouldCheckpoint() } used in the processing event loop. Default >> > could >> > >> >> >> be up to the runner but if set on the transform (or dofn) it >> > would be >> > >> >> >> used to control when the checkpoint is done. Thinking out loud >> it >> > >> >> >> sounds close to jbatch checkpoint algorithm >> > >> >> >> (https://docs.oracle.com/javaee/7/api/javax/batch/api/ >> > >> >> >> chunk/CheckpointAlgorithm.html) >> > >> >> >> >> > >> >> >> Romain Manni-Bucau >> > >> >> >> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> > >> >> >> >> > >> >> >> >> > >> >> >> 2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net >> >: >> > >> >> >> > Yes, @StableReplay, that's the annotation. Thanks. >> > >> >> >> > >> > >> >> >> > >> > >> >> >> > On 11/15/2017 09:52 AM, Reuven Lax wrote: >> > >> >> >> >> >> > >> >> >> >> Romain, >> > >> >> >> >> >> > >> >> >> >> I think the @StableReplay semantic that Kenn proposed a month >> > or >> > >> so >> > >> >> ago >> > >> >> >> is >> > >> >> >> >> what is needed here. >> > >> >> >> >> >> > >> >> >> >> Essentially it will ensure that the GroupByKey iterable is >> > stable >> > >> and >> > >> >> >> >> checkpointed. So on replay, the GroupByKey is guaranteed to >> > >> receive >> > >> >> the >> > >> >> >> >> exact same iterable as it did before. The annotation can be >> put >> > >> on a >> > >> >> >> ParDo >> > >> >> >> >> as well, in which case it ensures stability (and >> > checkpointing) of >> > >> >> the >> > >> >> >> >> individual ParDo elements. >> > >> >> >> >> >> > >> >> >> >> Reuven >> > >> >> >> >> >> > >> >> >> >> On Wed, Nov 15, 2017 at 4:49 PM, Romain Manni-Bucau >> > >> >> >> >> <rmannibu...@gmail.com> >> > >> >> >> >> wrote: >> > >> >> >> >> >> > >> >> >> >>> 2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré < >> > j...@nanthrax.net >> > >> >: >> > >> >> >> >>>> >> > >> >> >> >>>> Hi Romain, >> > >> >> >> >>>> >> > >> >> >> >>>> You are right: currently, the chunking is related to >> bundles. >> > >> >> Today, >> > >> >> >> the >> > >> >> >> >>>> bundle size is under the runner responsibility. >> > >> >> >> >>>> >> > >> >> >> >>>> I think it's fine because only the runner know an efficient >> > >> bundle >> > >> >> >> size. >> > >> >> >> >>> >> > >> >> >> >>> I'm >> > >> >> >> >>>> >> > >> >> >> >>>> afraid giving the "control" of the bundle size to the end >> > user >> > >> (via >> > >> >> >> >>>> pipeline) can result to huge performances issue depending >> of >> > the >> > >> >> >> runner. >> > >> >> >> >>>> >> > >> >> >> >>>> It doesn't mean that we can't use an uber layer: it's what >> we >> > >> do in >> > >> >> >> >>>> ParDoWithBatch or DoFn in IO Sink where we have a batch >> size. >> > >> >> >> >>>> >> > >> >> >> >>>> Anyway, the core problem is about the checkpoint: why a >> > >> checkpoint >> > >> >> is >> > >> >> >> >>>> not >> > >> >> >> >>>> "respected" by an IO or runner ? >> > >> >> >> >>> >> > >> >> >> >>> >> > >> >> >> >>> >> > >> >> >> >>> Take the example of a runner deciding the bundle size is 4 >> and >> > >> the >> > >> >> IO >> > >> >> >> >>> deciding the commit-interval (batch semantic) is 2, what >> > happens >> > >> if >> > >> >> >> >>> the 3rd record fails? You have pushed to the store 2 records >> > >> which >> > >> >> can >> > >> >> >> >>> be reprocessed by a restart of the bundle and you can get >> > >> >> duplicates. >> > >> >> >> >>> >> > >> >> >> >>> Rephrased: I think we need as a framework a batch/chunk >> > solution >> > >> >> which >> > >> >> >> >>> is reliable. I understand bundles is mapped on the runner >> and >> > not >> > >> >> >> >>> really controlled but can we get something more reliable for >> > the >> > >> >> user? >> > >> >> >> >>> Maybe we need a @BeforeBatch or something like that. >> > >> >> >> >>> >> > >> >> >> >>>> >> > >> >> >> >>>> Regards >> > >> >> >> >>>> JB >> > >> >> >> >>>> >> > >> >> >> >>>> >> > >> >> >> >>>> On 11/15/2017 09:38 AM, Romain Manni-Bucau wrote: >> > >> >> >> >>>>> >> > >> >> >> >>>>> >> > >> >> >> >>>>> Hi guys, >> > >> >> >> >>>>> >> > >> >> >> >>>>> The subject is a bit provocative but the topic is real and >> > >> coming >> > >> >> >> >>>>> again and again with the beam usage: how a dofn can handle >> > some >> > >> >> >> >>>>> "chunking". >> > >> >> >> >>>>> >> > >> >> >> >>>>> The need is to be able to commit each N records but with N >> > not >> > >> too >> > >> >> >> big. >> > >> >> >> >>>>> >> > >> >> >> >>>>> The natural API for that in beam is the bundle one but >> > bundles >> > >> are >> > >> >> >> not >> > >> >> >> >>>>> reliable since they can be very small (flink) - we can say >> > it >> > >> is >> > >> >> "ok" >> > >> >> >> >>>>> even if it has some perf impacts - or too big (spark does >> > full >> > >> >> size / >> > >> >> >> >>>>> #workers). >> > >> >> >> >>>>> >> > >> >> >> >>>>> The workaround is what we see in the ES I/O: a maxSize >> which >> > >> does >> > >> >> an >> > >> >> >> >>>>> eager flush. The issue is that then the checkpoint is not >> > >> >> respected >> > >> >> >> >>>>> and you can process multiple times the same records. >> > >> >> >> >>>>> >> > >> >> >> >>>>> Any plan to make this API reliable and controllable from a >> > beam >> > >> >> point >> > >> >> >> >>>>> of view (at least in a max manner)? >> > >> >> >> >>>>> >> > >> >> >> >>>>> Thanks, >> > >> >> >> >>>>> Romain Manni-Bucau >> > >> >> >> >>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> > >> >> >> >>>>> >> > >> >> >> >>>> >> > >> >> >> >>>> -- >> > >> >> >> >>>> Jean-Baptiste Onofré >> > >> >> >> >>>> jbono...@apache.org >> > >> >> >> >>>> http://blog.nanthrax.net >> > >> >> >> >>>> Talend - http://www.talend.com >> > >> >> >> >>> >> > >> >> >> >>> >> > >> >> >> >> >> > >> >> >> > >> > >> >> >> > -- >> > >> >> >> > Jean-Baptiste Onofré >> > >> >> >> > jbono...@apache.org >> > >> >> >> > http://blog.nanthrax.net >> > >> >> >> > Talend - http://www.talend.com >> > >> >> >> >> > >> >> >> > >> >> > >>