"First immediately blocking issue is how to batch records reliably and *portably* (the biggest beam added-value IMHO). Since bundles are "flush" trigger for most IO it means ensuring the bundle size is somehow controllable or at least not set to a very small value OOTB."
Please cite an existing IO that currently suffers from this issue: I'm not aware of any. On Thu, Nov 30, 2017, 9:46 AM Romain Manni-Bucau <[email protected]> wrote: > @Ben: would all IO be rewritten to use that and the bundle concept > dropped from the API to avoid any ambiguity and misleading usage like > in current IOs? > > Romain Manni-Bucau > @rmannibucau | Blog | Old Blog | Github | LinkedIn > > > 2017-11-30 18:43 GMT+01:00 Ben Chambers <[email protected]>: > > Beam includes a GroupIntoBatches transform (see > > > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java > ) > > which I believe was intended to be used as part of such a portable IO. It > > can be used to request that elements are divided into batches of some > size > > which can then be used for further processing. > > > > On Thu, Nov 30, 2017 at 9:32 AM Romain Manni-Bucau < > [email protected]> > > wrote: > >> > >> 2017-11-30 18:11 GMT+01:00 Eugene Kirpichov <[email protected]>: > >> > Very strong -1 from me: > >> > - Having a pipeline-global parameter is bad because it will apply to > all > >> > transforms, with no ability to control it for individual transforms. > >> > This > >> > can go especially poorly because it means that when I write a > transform, > >> > I > >> > don't know whether a user will set this parameter in their pipeline > to a > >> > value that's perhaps good for the user's transform but really bad for > my > >> > transform; and the user will likely blame my transform for poor > >> > performance. > >> > A parameter like this should be set on exactly the thing it applies > to: > >> > e.g. > >> > on the particular IO; and it should be set by the IO itself, not by a > >> > user > >> > in pipeline options, because the IO author likely knows better than a > >> > user > >> > what is a good value. > >> > >> This is true and this is worse today since the user can't tune it but > >> the IO doesn't handle it as well. it is up to the runner and none > >> implement it in a way which is IO friendly -check flink and spark > >> which do the exact opposite, bundle=1 vs bundle=datatset/partitions) > >> > >> Also note it is a "max" and not an exact value in the proposal. > >> > >> > - The parameter will not achieve what many IOs want, either. In some > >> > cases, > >> > you want to limit the number of bytes you write. In some cases, you > want > >> > to > >> > limit the number of values within a key that you write. In some cases, > >> > it's > >> > something else - it isn't always elements. > >> > >> Elements is the only thing users can really tune since you can't > >> assume the content. > >> > >> > - The parameter will achieve none of the issues that you I think > raised > >> > in > >> > the thread above: it doesn't give deterministic replay, nor any kind > of > >> > fault tolerance. > >> > >> Right, it only partially solves the first issue popping up: the > >> chunking. However I think it is a quick win. > >> > >> > - Having a parameter like this *at all* goes against Beam's "no knobs" > >> > philosophy - for all the usual reasons: 1) it encourages users to > waste > >> > time > >> > looking in the wrong places when doing performance tuning: tuning > >> > parameters > >> > is almost never the best way to improve performance; 2) when users can > >> > set a > >> > tuning parameter, in my experience it is almost always set wrong, or > >> > perhaps > >> > it was once set right but then nobody updates it when the use case or > >> > implementation changes; and we can end up in a situation where the > >> > pipeline > >> > is performing poorly because of the parameter but the runner isn't > >> > allowed > >> > to choose a better value. (in experience with legacy data processing > >> > systems > >> > in Google, like MapReduce, that support plenty of tuning parameters, a > >> > very > >> > common advice to someone complaining about a poorly performing job is > >> > "have > >> > you tried removing all your parameters?") > >> > >> I would be fine with that but what is the alternative? > >> > >> > - I still fail to understand the exact issue we're talking about, and > >> > I've > >> > made a number of suggestions as to how this understanding could be > >> > achieved: > >> > show code that demonstrates the issue; and show how the code could be > >> > improved by a hypothetical API. > >> > >> First immediately blocking issue is how to batch records reliably and > >> *portably* (the biggest beam added-value IMHO). > >> Since bundles are "flush" trigger for most IO it means ensuring the > >> bundle size is somehow controllable or at least not set to a very > >> small value OOTB. > >> > >> An alternative to this proposal can be to let an IO give an hint about > >> its desired bundle size. Would work as well for that particular issue. > >> Does it sound better? > >> > >> > > >> > On Thu, Nov 30, 2017 at 6:17 AM Jean-Baptiste Onofré <[email protected] > > > >> > wrote: > >> >> > >> >> It sounds reasonable to me. > >> >> > >> >> And agree for Spark, I would like to merge Spark 2 update first. > >> >> > >> >> Regards > >> >> JB > >> >> > >> >> On 11/30/2017 03:09 PM, Romain Manni-Bucau wrote: > >> >> > Guys, > >> >> > > >> >> > what about moving getMaxBundleSize from flink options to pipeline > >> >> > options. I think all runners can support it right? Spark code needs > >> >> > the merge of the v2 before being able to be implemented probably > but > >> >> > I > >> >> > don't see any blocker. > >> >> > > >> >> > wdyt? > >> >> > > >> >> > Romain Manni-Bucau > >> >> > @rmannibucau | Blog | Old Blog | Github | LinkedIn > >> >> > > >> >> > > >> >> > 2017-11-19 8:19 GMT+01:00 Romain Manni-Bucau < > [email protected]>: > >> >> >> @Eugene: "workaround" as specific to the IO each time and > therefore > >> >> >> still highlight a lack in the core. > >> >> >> > >> >> >> Other comments inline > >> >> >> > >> >> >> > >> >> >> 2017-11-19 7:40 GMT+01:00 Robert Bradshaw > >> >> >> <[email protected]>: > >> >> >>> There is a possible fourth issue that we don't handle well: > >> >> >>> efficiency. For > >> >> >>> very large bundles, it may be advantageous to avoid replaying a > >> >> >>> bunch > >> >> >>> of > >> >> >>> idempotent operations if there were a way to record what ones > we're > >> >> >>> sure > >> >> >>> went through. Not sure if that's the issue here (though one could > >> >> >>> possibly > >> >> >>> do this with SDFs, one can preemptively returning periodically > >> >> >>> before > >> >> >>> an > >> >> >>> element (or portion thereof) is done). > >> >> >> > >> >> >> +1, also lead to the IO handling its own chunking/bundles and > >> >> >> therefore solves all issues at once. > >> >> >> > >> >> >>> > >> >> >>> On Sat, Nov 18, 2017 at 6:58 PM, Eugene Kirpichov < > >> >> >>> [email protected]> wrote: > >> >> >>> > >> >> >>>> I disagree that the usage of document id in ES is a > "workaround" - > >> >> >>>> it > >> >> >>>> does > >> >> >>>> not address any *accidental *complexity > >> >> >>>> <https://en.wikipedia.org/wiki/No_Silver_Bullet> coming from > >> >> >>>> shortcomings > >> >> >>>> of Beam, it addresses the *essential* complexity that a > >> >> >>>> distributed > >> >> >>>> system > >> >> >>>> forces one to take it as a fact of nature that the same write > >> >> >>>> (mutation) will happen multiple times, so if you want a mutation > >> >> >>>> to > >> >> >>>> happen > >> >> >>>> "as-if" it happened exactly once, the mutation itself must be > >> >> >>>> idempotent > >> >> >>>> <https://en.wikipedia.org/wiki/Idempotence>. Insert-with-id > >> >> >>>> (upsert > >> >> >>>> <https://en.wikipedia.org/wiki/Merge_(SQL)>) is a classic > example > >> >> >>>> of > >> >> >>>> an > >> >> >>>> idempotent mutation, and it's very good that Elasticsearch > >> >> >>>> provides > >> >> >>>> it - if > >> >> >>>> it didn't, no matter how good of an API Beam had, achieving > >> >> >>>> exactly-once > >> >> >>>> writes would be theoretically impossible. Are we in agreement on > >> >> >>>> this > >> >> >>>> so > >> >> >>>> far? > >> >> >>>> > >> >> >>>> Next: you seem to be discussing 3 issues together, all of which > >> >> >>>> are > >> >> >>>> valid > >> >> >>>> issues, but they seem unrelated to me: > >> >> >>>> 1. Exactly-once mutation > >> >> >>>> 2. Batching multiple mutations into one RPC. > >> >> >>>> 3. Backpressure > >> >> >>>> > >> >> >>>> #1: was considered above. The system the IO is talking to has to > >> >> >>>> support > >> >> >>>> idempotent mutations, in an IO-specific way, and the IO has to > >> >> >>>> take > >> >> >>>> advantage of them, in the IO-specific way - end of story. > >> >> >> > >> >> >> Agree but don't forget the original point was about "chunks" and > not > >> >> >> individual records. > >> >> >> > >> >> >>>> > >> >> >>>> #2: a batch of idempotent operations is also idempotent, so this > >> >> >>>> doesn't > >> >> >>>> add anything new semantically. Syntactically - Beam already > allows > >> >> >>>> you to > >> >> >>>> write your own batching by notifying you of permitted batch > >> >> >>>> boundaries > >> >> >>>> (Start/FinishBundle). Sure, it could do more, but from my > >> >> >>>> experience > >> >> >>>> the > >> >> >>>> batching in IOs I've seen is one of the easiest and least > >> >> >>>> error-prone > >> >> >>>> parts, so I don't see something worth an extended discussion > here. > >> >> >> > >> >> >> "Beam already allows you to > >> >> >> write your own batching by notifying you of permitted batch > >> >> >> boundaries > >> >> >> (Start/FinishBundle)" > >> >> >> > >> >> >> Is wrong since the bundle is potentially the whole PCollection > >> >> >> (spark) > >> >> >> so this is not even an option until you use the SDF (back to the > >> >> >> same > >> >> >> point). > >> >> >> Once again the API looks fine but no implementation makes it true. > >> >> >> It > >> >> >> would be easy to change it in spark, flink can be ok since it > >> >> >> targets > >> >> >> more the streaming case, not sure of others, any idea? > >> >> >> > >> >> >> > >> >> >>>> > >> >> >>>> #3: handling backpressure is a complex problem with multiple > >> >> >>>> facets: > >> >> >>>> 1) how > >> >> >>>> do you know you're being throttled, and by how much are you > >> >> >>>> exceeding > >> >> >>>> the > >> >> >>>> external system's capacity? > >> >> >> > >> >> >> This is the whole point of backpressure, the system sends it back > to > >> >> >> you (header like or status technic in general) > >> >> >> > >> >> >>>> 2) how do you communicate this signal to the > >> >> >>>> runner? > >> >> >> > >> >> >> You are a client so you get the meta in the response - whatever > >> >> >> techno. > >> >> >> > >> >> >>>> 3) what does the runner do in response? > >> >> >> > >> >> >> Runner nothing but the IO adapts its handling as mentionned before > >> >> >> (wait and retry, skip, ... depending the config) > >> >> >> > >> >> >>>> 4) how do you wait until > >> >> >>>> it's ok to try again? > >> >> >> > >> >> >> This is one point to probably enhance in beam but waiting in the > >> >> >> processing is an option if the source has some buffering otherwise > >> >> >> it > >> >> >> requires to have a buffer fallback and max size if the wait mode > is > >> >> >> activated. > >> >> >> > >> >> >>>> You seem to be advocating for solving one facet of this problem, > >> >> >>>> which is: > >> >> >>>> you want it to be possible to signal to the runner "I'm being > >> >> >>>> throttled, > >> >> >>>> please end the bundle", right? If so - I think this (ending the > >> >> >>>> bundle) is > >> >> >>>> unnecessary: the DoFn can simply do an exponential back-off > sleep > >> >> >>>> loop. > >> >> >> > >> >> >> Agree, never said the runner should know but GBK+output doesnt > work > >> >> >> cause you dont own the GBK. > >> >> >> > >> >> >>>> This is e.g. what DatastoreIO does > >> >> >>>> <https://github.com/apache/beam/blob/master/sdks/java/io/ > >> >> >>>> google-cloud-platform/src/main/java/org/apache/beam/sdk/ > >> >> >>>> io/gcp/datastore/DatastoreV1.java#L1318> > >> >> >>>> and > >> >> >>>> this is in general how most systems I've seen handle > backpressure. > >> >> >>>> Is > >> >> >>>> there > >> >> >>>> something I'm missing? In particular, is there any compelling > >> >> >>>> reason > >> >> >>>> why > >> >> >>>> you think it'd be beneficial e.g. for DatastoreIO to commit the > >> >> >>>> results of > >> >> >>>> the bundle so far before processing other elements? > >> >> >> > >> >> >> It was more about ensuring you validate early a subset of the > whole > >> >> >> bundle and avoid to reprocess it if it fails later. > >> >> >> > >> >> >> > >> >> >> So to summarize I see 2 outcomes: > >> >> >> > >> >> >> 1. impl SDF in all runners > >> >> >> 2. make the bundle size upper bounded - through a pipeline option > - > >> >> >> in > >> >> >> all runners, not sure this one is doable everywhere since I mainly > >> >> >> checked spark case > >> >> >> > >> >> >>>> > >> >> >>>> Again, it might be that I'm still misunderstanding what you're > >> >> >>>> trying > >> >> >>>> to > >> >> >>>> say. One of the things it would help to clarify would be - > exactly > >> >> >>>> what do > >> >> >>>> you mean by "how batch frameworks solved that for years": can > you > >> >> >>>> point at > >> >> >>>> an existing API in some other framework that achieves what you > >> >> >>>> want? > >> >> >>>> > >> >> >>>> On Sat, Nov 18, 2017 at 1:02 PM Romain Manni-Bucau > >> >> >>>> <[email protected]> > >> >> >>>> wrote: > >> >> >>>> > >> >> >>>>> Eugene, point - and issue with a single sample - is you can > >> >> >>>>> always > >> >> >>>>> find > >> >> >>>>> *workarounds* on a case by case basis as the id one with ES but > >> >> >>>>> beam > >> >> >>>> doesnt > >> >> >>>>> solve the problem as a framework. > >> >> >>>>> > >> >> >>>>> From my past, I clearly dont see how batch frameworks solved > >> >> >>>>> that > >> >> >>>>> for > >> >> >>>> years > >> >> >>>>> and beam is not able to do it - keep in mind it is the same > kind > >> >> >>>>> of > >> >> >>>> techno, > >> >> >>>>> it just uses different sources and bigger clusters so no real > >> >> >>>>> reason > >> >> >>>>> to > >> >> >>>> not > >> >> >>>>> have the same feature quality. The only potential reason i see > is > >> >> >>>>> there > >> >> >>>> is > >> >> >>>>> no tracking of the state into the cluster - e2e. But i dont see > >> >> >>>>> why > >> >> >>>>> there > >> >> >>>>> wouldnt be. Do I miss something here? > >> >> >>>>> > >> >> >>>>> An example could be: take a github crawler computing stats on > the > >> >> >>>>> whole > >> >> >>>>> girhub repos which is based on a rest client as example. You > will > >> >> >>>>> need to > >> >> >>>>> handle the rate limit and likely want to "commit" each time you > >> >> >>>>> reach a > >> >> >>>>> rate limit with likely some buffering strategy with a max size > >> >> >>>>> before > >> >> >>>>> really waiting. How do you do it with a GBK independent of your > >> >> >>>>> dofn? You > >> >> >>>>> are not able to compose correctly the fn between them :(. > >> >> >>>>> > >> >> >>>>> > >> >> >>>>> Le 18 nov. 2017 20:48, "Eugene Kirpichov" > >> >> >>>>> <[email protected]> > >> >> >>>> a > >> >> >>>>> écrit : > >> >> >>>>> > >> >> >>>>> After giving this thread my best attempt at understanding > exactly > >> >> >>>>> what is > >> >> >>>>> the problem and the proposed solution, I'm afraid I still fail > to > >> >> >>>>> understand both. To reiterate, I think the only way to make > >> >> >>>>> progress > >> >> >>>>> here > >> >> >>>>> is to be more concrete: (quote) take some IO that you think > could > >> >> >>>>> be > >> >> >>>> easier > >> >> >>>>> to write with your proposed API, give the contents of a > >> >> >>>>> hypothetical > >> >> >>>>> PCollection being written to this IO, give the code of a > >> >> >>>>> hypothetical > >> >> >>>> DoFn > >> >> >>>>> implementing the write using your API, and explain what you'd > >> >> >>>>> expect > >> >> >>>>> to > >> >> >>>>> happen at runtime. I'm going to re-engage in this thread after > >> >> >>>>> such > >> >> >>>>> an > >> >> >>>>> example is given. > >> >> >>>>> > >> >> >>>>> On Sat, Nov 18, 2017, 5:00 AM Romain Manni-Bucau > >> >> >>>>> <[email protected]> > >> >> >>>>> wrote: > >> >> >>>>> > >> >> >>>>>> First bundle retry is unusable with dome runners like spark > >> >> >>>>>> where > >> >> >>>>>> the > >> >> >>>>>> bundle size is the collection size / number of work. This > means > >> >> >>>>>> a > >> >> >>>>>> user > >> >> >>>>> cant > >> >> >>>>>> use bundle API or feature reliably and portably - which is > beam > >> >> >>>> promise. > >> >> >>>>>> Aligning chunking and bundles would guarantee that bit can be > >> >> >>>>>> not > >> >> >>>>> desired, > >> >> >>>>>> that is why i thought it can be another feature. > >> >> >>>>>> > >> >> >>>>>> GBK works until the IO knows about that and both concepts are > >> >> >>>>>> not > >> >> >>>> always > >> >> >>>>>> orthogonal - backpressure like systems is a trivial common > >> >> >>>>>> example. > >> >> >>>> This > >> >> >>>>>> means the IO (dofn) must be able to do it itself at some > point. > >> >> >>>>>> > >> >> >>>>>> Also note the GBK works only if the IO can take a list which > is > >> >> >>>>>> never > >> >> >>>> the > >> >> >>>>>> case today. > >> >> >>>>>> > >> >> >>>>>> Big questions for me are: is SDF the way to go since it > provides > >> >> >>>>>> the > >> >> >>>>> needed > >> >> >>>>>> API bit is not yet supported? What about existing IO? Should > >> >> >>>>>> beam > >> >> >>>> provide > >> >> >>>>>> an auto wrapping of dofn for that pre-aggregated support and > >> >> >>>>>> simulate > >> >> >>>>>> bundles to the actual IO impl to keep the existing API? > >> >> >>>>>> > >> >> >>>>>> > >> >> >>>>>> Le 17 nov. 2017 19:20, "Raghu Angadi" > >> >> >>>>>> <[email protected]> > >> >> >>>>>> a > >> >> >>>>>> écrit : > >> >> >>>>>> > >> >> >>>>>> On Fri, Nov 17, 2017 at 1:02 AM, Romain Manni-Bucau < > >> >> >>>>> [email protected] > >> >> >>>>>>> > >> >> >>>>>> wrote: > >> >> >>>>>> > >> >> >>>>>>> Yep, just take ES IO, if a part of a bundle fails you are in > an > >> >> >>>>>>> unmanaged state. This is the case for all O (of IO ;)). Issue > >> >> >>>>>>> is > >> >> >>>>>>> not > >> >> >>>>>>> much about "1" (the code it takes) but more the fact it > doesn't > >> >> >>>>>>> integrate with runner features and retries potentially: what > >> >> >>>>>>> happens > >> >> >>>>>>> if a bundle has a failure? => undefined today. 2. I'm fine > with > >> >> >>>>>>> it > >> >> >>>>>>> while we know exactly what happens when we restart after a > >> >> >>>>>>> bundle > >> >> >>>>>>> failure. With ES the timestamp can be used for instance. > >> >> >>>>>>> > >> >> >>>>>> > >> >> >>>>>> This deterministic batching can be achieved even now with an > >> >> >>>>>> extra > >> >> >>>>>> GroupByKey (and if you want ordering on top of that, will need > >> >> >>>>>> another > >> >> >>>>>> GBK). Don't know if that is too costly in your case. I would > >> >> >>>>>> need > >> >> >>>>>> bit > >> >> >>>>> more > >> >> >>>>>> details on handling ES IO write retries to see it could be > >> >> >>>>>> simplified. > >> >> >>>>> Note > >> >> >>>>>> that retries occur with or without any failures in your DoFn. > >> >> >>>>>> > >> >> >>>>>> The biggest negative with GBK approach is that it doesn't > >> >> >>>>>> provide > >> >> >>>>>> same > >> >> >>>>>> guarantees on Flink. > >> >> >>>>>> > >> >> >>>>>> I don't see how GroubIntoBatches in Beam provides specific > >> >> >>>>>> guarantees > >> >> >>>> on > >> >> >>>>>> deterministic batches. > >> >> >>>>>> > >> >> >>>>>> Thinking about it the SDF is really a way to do it since the > SDF > >> >> >>>>>> will > >> >> >>>>>>> manage the bulking and associated with the runner "retry" it > >> >> >>>>>>> seems > >> >> >>>>>>> it > >> >> >>>>>>> covers the needs. > >> >> >>>>>>> > >> >> >>>>>>> Romain Manni-Bucau > >> >> >>>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn > >> >> >>>>>>> > >> >> >>>>>>> > >> >> >>>>>>> 2017-11-17 9:23 GMT+01:00 Eugene Kirpichov > >> >> >>>>> <[email protected] > >> >> >>>>>>> : > >> >> >>>>>>>> I must admit I'm still failing to understand the problem, so > >> >> >>>>>>>> let's > >> >> >>>>> step > >> >> >>>>>>>> back even further. > >> >> >>>>>>>> > >> >> >>>>>>>> Could you give an example of an IO that is currently > difficult > >> >> >>>>>>>> to > >> >> >>>>>>> implement > >> >> >>>>>>>> specifically because of lack of the feature you're talking > >> >> >>>>>>>> about? > >> >> >>>>>>>> > >> >> >>>>>>>> I'm asking because I've reviewed almost all Beam IOs and > don't > >> >> >>>> recall > >> >> >>>>>>>> seeing a similar problem. Sure, a lot of IOs do batching > >> >> >>>>>>>> within a > >> >> >>>>>> bundle, > >> >> >>>>>>>> but 1) it doesn't take up much code (granted, it would be > even > >> >> >>>> easier > >> >> >>>>>> if > >> >> >>>>>>>> Beam did it for us) and 2) I don't remember any of them > >> >> >>>>>>>> requiring > >> >> >>>> the > >> >> >>>>>>>> batches to be deterministic, and I'm having a hard time > >> >> >>>>>>>> imagining > >> >> >>>>> what > >> >> >>>>>>> kind > >> >> >>>>>>>> of storage system would be able to deduplicate if batches > were > >> >> >>>>>>>> deterministic but wouldn't be able to deduplicate if they > >> >> >>>>>>>> weren't. > >> >> >>>>>>>> > >> >> >>>>>>>> On Thu, Nov 16, 2017 at 11:50 PM Romain Manni-Bucau < > >> >> >>>>>>> [email protected]> > >> >> >>>>>>>> wrote: > >> >> >>>>>>>> > >> >> >>>>>>>>> Ok, let me try to step back and summarize what we have > today > >> >> >>>>>>>>> and > >> >> >>>>> what > >> >> >>>>>> I > >> >> >>>>>>>>> miss: > >> >> >>>>>>>>> > >> >> >>>>>>>>> 1. we can handle chunking in beam through group in batch > (or > >> >> >>>>>> equivalent) > >> >> >>>>>>>>> but: > >> >> >>>>>>>>> > it is not built-in into the transforms (IO) and it is > >> >> >>>>> controlled > >> >> >>>>>>>>> from outside the transforms so no way for a transform to do > >> >> >>>>>>>>> it > >> >> >>>>>>>>> properly without handling itself a composition and links > >> >> >>>>>>>>> between > >> >> >>>>>>>>> multiple dofns to have notifications and potentially react > >> >> >>>> properly > >> >> >>>>> or > >> >> >>>>>>>>> handle backpressure from its backend > >> >> >>>>>>>>> 2. there is no restart feature because there is no real > state > >> >> >>>>> handling > >> >> >>>>>>>>> at the moment. this sounds fully delegated to the runner > but > >> >> >>>>>>>>> I > >> >> >>>>>>>>> was > >> >> >>>>>>>>> hoping to have more guarantees from the used API to be able > >> >> >>>>>>>>> to > >> >> >>>>> restart > >> >> >>>>>>>>> a pipeline (mainly batch since it can be irrelevant or > >> >> >>>>>>>>> delegates > >> >> >>>> to > >> >> >>>>>>>>> the backend for streams) and handle only not commited > records > >> >> >>>>>>>>> so > >> >> >>>> it > >> >> >>>>>>>>> requires some persistence outside the main IO storages to > do > >> >> >>>>>>>>> it > >> >> >>>>>>>>> properly > >> >> >>>>>>>>> > note this is somehow similar to the monitoring topic > >> >> >>>>>>>>> which > >> >> >>>> miss > >> >> >>>>>>>>> persistence ATM so it can end up to beam to have a > pluggable > >> >> >>>> storage > >> >> >>>>>>>>> for a few concerns > >> >> >>>>>>>>> > >> >> >>>>>>>>> > >> >> >>>>>>>>> Short term I would be happy with 1 solved properly, long > term > >> >> >>>>>>>>> I > >> >> >>>> hope > >> >> >>>>> 2 > >> >> >>>>>>>>> will be tackled without workarounds requiring custom > wrapping > >> >> >>>>>>>>> of > >> >> >>>> IO > >> >> >>>>> to > >> >> >>>>>>>>> use a custom state persistence. > >> >> >>>>>>>>> > >> >> >>>>>>>>> > >> >> >>>>>>>>> > >> >> >>>>>>>>> Romain Manni-Bucau > >> >> >>>>>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn > >> >> >>>>>>>>> > >> >> >>>>>>>>> > >> >> >>>>>>>>> 2017-11-17 7:44 GMT+01:00 Jean-Baptiste Onofré > >> >> >>>>>>>>> <[email protected]>: > >> >> >>>>>>>>>> Thanks for the explanation. Agree, we might talk about > >> >> >>>>>>>>>> different > >> >> >>>>>>> things > >> >> >>>>>>>>>> using the same wording. > >> >> >>>>>>>>>> > >> >> >>>>>>>>>> I'm also struggling to understand the use case (for a > >> >> >>>>>>>>>> generic > >> >> >>>>> DoFn). > >> >> >>>>>>>>>> > >> >> >>>>>>>>>> Regards > >> >> >>>>>>>>>> JB > >> >> >>>>>>>>>> > >> >> >>>>>>>>>> > >> >> >>>>>>>>>> On 11/17/2017 07:40 AM, Eugene Kirpichov wrote: > >> >> >>>>>>>>>>> > >> >> >>>>>>>>>>> To avoid spending a lot of time pursuing a false path, > I'd > >> >> >>>>>>>>>>> like > >> >> >>>>> to > >> >> >>>>>>> say > >> >> >>>>>>>>>>> straight up that SDF is definitely not going to help > here, > >> >> >>>>> despite > >> >> >>>>>>> the > >> >> >>>>>>>>>>> fact > >> >> >>>>>>>>>>> that its API includes the term "checkpoint". In SDF, the > >> >> >>>>>> "checkpoint" > >> >> >>>>>>>>>>> captures the state of processing within a single element. > >> >> >>>>>>>>>>> If > >> >> >>>>> you're > >> >> >>>>>>>>>>> applying an SDF to 1000 elements, it will, like any other > >> >> >>>>>>>>>>> DoFn, > >> >> >>>>> be > >> >> >>>>>>>>> applied > >> >> >>>>>>>>>>> to each of them independently and in parallel, and you'll > >> >> >>>>>>>>>>> have > >> >> >>>>> 1000 > >> >> >>>>>>>>>>> checkpoints capturing the state of processing each of > these > >> >> >>>>>> elements, > >> >> >>>>>>>>>>> which > >> >> >>>>>>>>>>> is probably not what you want. > >> >> >>>>>>>>>>> > >> >> >>>>>>>>>>> I'm afraid I still don't understand what kind of > checkpoint > >> >> >>>>>>>>>>> you > >> >> >>>>>>> need, if > >> >> >>>>>>>>>>> it > >> >> >>>>>>>>>>> is not just deterministic grouping into batches. > >> >> >>>>>>>>>>> "Checkpoint" > >> >> >>>> is > >> >> >>>>> a > >> >> >>>>>>> very > >> >> >>>>>>>>>>> broad term and it's very possible that everybody in this > >> >> >>>>>>>>>>> thread > >> >> >>>>> is > >> >> >>>>>>>>> talking > >> >> >>>>>>>>>>> about different things when saying it. So it would help > if > >> >> >>>>>>>>>>> you > >> >> >>>>>> could > >> >> >>>>>>>>> give > >> >> >>>>>>>>>>> a > >> >> >>>>>>>>>>> more concrete example: for example, take some IO that you > >> >> >>>>>>>>>>> think > >> >> >>>>>>> could be > >> >> >>>>>>>>>>> easier to write with your proposed API, give the contents > >> >> >>>>>>>>>>> of a > >> >> >>>>>>>>>>> hypothetical > >> >> >>>>>>>>>>> PCollection being written to this IO, give the code of a > >> >> >>>>>> hypothetical > >> >> >>>>>>>>> DoFn > >> >> >>>>>>>>>>> implementing the write using your API, and explain what > >> >> >>>>>>>>>>> you'd > >> >> >>>>>> expect > >> >> >>>>>>> to > >> >> >>>>>>>>>>> happen at runtime. > >> >> >>>>>>>>>>> > >> >> >>>>>>>>>>> On Thu, Nov 16, 2017 at 10:33 PM Romain Manni-Bucau > >> >> >>>>>>>>>>> <[email protected]> > >> >> >>>>>>>>>>> wrote: > >> >> >>>>>>>>>>> > >> >> >>>>>>>>>>>> @Eugene: yes and the other alternative of Reuven too but > >> >> >>>>>>>>>>>> it > >> >> >>>>>>>>>>>> is > >> >> >>>>>> still > >> >> >>>>>>>>>>>> 1. relying on timers, 2. not really checkpointed > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> In other words it seems all solutions are to create a > >> >> >>>>>>>>>>>> chunk > >> >> >>>>>>>>>>>> of > >> >> >>>>>> size > >> >> >>>>>>> 1 > >> >> >>>>>>>>>>>> and replayable to fake the lack of chunking in the > >> >> >>>>>>>>>>>> framework. > >> >> >>>>> This > >> >> >>>>>>>>>>>> always implies a chunk handling outside the component > >> >> >>>> (typically > >> >> >>>>>>>>>>>> before for an output). My point is I think IO need it in > >> >> >>>>>>>>>>>> their > >> >> >>>>> own > >> >> >>>>>>>>>>>> "internal" or at least control it themselves since the > >> >> >>>>>>>>>>>> chunk > >> >> >>>>> size > >> >> >>>>>> is > >> >> >>>>>>>>>>>> part of the IO handling most of the time. > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> I think JB spoke of the same "group before" trick using > >> >> >>>>>> restrictions > >> >> >>>>>>>>>>>> which can work I have to admit if SDF are implemented by > >> >> >>>>> runners. > >> >> >>>>>> Is > >> >> >>>>>>>>>>>> there a roadmap/status on that? Last time I checked SDF > >> >> >>>>>>>>>>>> was a > >> >> >>>>>> great > >> >> >>>>>>>>>>>> API without support :(. > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> Romain Manni-Bucau > >> >> >>>>>>>>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> 2017-11-17 7:25 GMT+01:00 Eugene Kirpichov > >> >> >>>>>>>>>>>> <[email protected]>: > >> >> >>>>>>>>>>>>> > >> >> >>>>>>>>>>>>> JB, not sure what you mean? SDFs and triggers are > >> >> >>>>>>>>>>>>> unrelated, > >> >> >>>>> and > >> >> >>>>>>> the > >> >> >>>>>>>>>>>>> post > >> >> >>>>>>>>>>>>> doesn't mention the word. Did you mean something else, > >> >> >>>>>>>>>>>>> e.g. > >> >> >>>>>>>>> restriction > >> >> >>>>>>>>>>>>> perhaps? Either way I don't think SDFs are the solution > >> >> >>>>>>>>>>>>> here; > >> >> >>>>>> SDFs > >> >> >>>>>>>>> have > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> to > >> >> >>>>>>>>>>>>> > >> >> >>>>>>>>>>>>> do with the ability to split the processing of *a > single > >> >> >>>>> element* > >> >> >>>>>>> over > >> >> >>>>>>>>>>>>> multiple calls, whereas Romain I think is asking for > >> >> >>>> repeatable > >> >> >>>>>>>>> grouping > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> of > >> >> >>>>>>>>>>>>> > >> >> >>>>>>>>>>>>> *multiple* elements. > >> >> >>>>>>>>>>>>> > >> >> >>>>>>>>>>>>> Romain - does > >> >> >>>>>>>>>>>>> > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/ > >> >> >>>>>>> core/src/main/java/org/apache/beam/sdk/transforms/ > >> >> >>>> GroupIntoBatches.java > >> >> >>>>>>>>>>>>> > >> >> >>>>>>>>>>>>> do what > >> >> >>>>>>>>>>>>> you want? > >> >> >>>>>>>>>>>>> > >> >> >>>>>>>>>>>>> On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste Onofré < > >> >> >>>>>>>>> [email protected]> > >> >> >>>>>>>>>>>>> wrote: > >> >> >>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>> It sounds like the "Trigger" in the Splittable DoFn, > no > >> >> >>>>>>>>>>>>>> ? > >> >> >>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>> > https://beam.apache.org/blog/2017/08/16/splittable-do-fn. > >> >> >>>> html > >> >> >>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>> Regards > >> >> >>>>>>>>>>>>>> JB > >> >> >>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>> On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote: > >> >> >>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>> it gives the fn/transform the ability to save a > state - > >> >> >>>>>>>>>>>>>>> it > >> >> >>>>> can > >> >> >>>>>>> get > >> >> >>>>>>>>>>>>>>> back on "restart" / whatever unit we can use, > probably > >> >> >>>> runner > >> >> >>>>>>>>>>>>>>> dependent? Without that you need to rewrite all IO > >> >> >>>>>>>>>>>>>>> usage > >> >> >>>> with > >> >> >>>>>>>>>>>>>>> something like the previous pattern which makes the > IO > >> >> >>>>>>>>>>>>>>> not > >> >> >>>>> self > >> >> >>>>>>>>>>>>>>> sufficient and kind of makes the entry cost and usage > >> >> >>>>>>>>>>>>>>> of > >> >> >>>> beam > >> >> >>>>>> way > >> >> >>>>>>>>>>>>>>> further. > >> >> >>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>> In my mind it is exactly what jbatch/spring-batch > uses > >> >> >>>>>>>>>>>>>>> but > >> >> >>>>>>> adapted > >> >> >>>>>>>>> to > >> >> >>>>>>>>>>>>>>> beam (stream in particular) case. > >> >> >>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>> Romain Manni-Bucau > >> >> >>>>>>>>>>>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn > >> >> >>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>> 2017-11-17 6:49 GMT+01:00 Reuven Lax > >> >> >>>>> <[email protected] > >> >> >>>>>>> : > >> >> >>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>> Romain, > >> >> >>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>> Can you define what you mean by checkpoint? What are > >> >> >>>>>>>>>>>>>>>> the > >> >> >>>>>>> semantics, > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> what > >> >> >>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>> does it accomplish? > >> >> >>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>> Reuven > >> >> >>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>> On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau > < > >> >> >>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>> [email protected]> > >> >> >>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>> wrote: > >> >> >>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>> Yes, what I propose earlier was: > >> >> >>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>> I. checkpoint marker: > >> >> >>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>> @AnyBeamAnnotation > >> >> >>>>>>>>>>>>>>>>> @CheckpointAfter > >> >> >>>>>>>>>>>>>>>>> public void someHook(SomeContext ctx); > >> >> >>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>> II. pipeline.apply(ParDo.of(new > >> >> >>>>>>>>> MyFn()).withCheckpointAlgorithm(new > >> >> >>>>>>>>>>>>>>>>> CountingAlgo())) > >> >> >>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>> III. (I like this one less) > >> >> >>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>> // in the dofn > >> >> >>>>>>>>>>>>>>>>> @CheckpointTester > >> >> >>>>>>>>>>>>>>>>> public boolean shouldCheckpoint(); > >> >> >>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>> IV. @Checkpointer Serializable getCheckpoint(); in > >> >> >>>>>>>>>>>>>>>>> the > >> >> >>>> dofn > >> >> >>>>>> per > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> element > >> >> >>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>> Romain Manni-Bucau > >> >> >>>>>>>>>>>>>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn > >> >> >>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>> 2017-11-17 6:06 GMT+01:00 Raghu Angadi > >> >> >>>>>>> <[email protected] > >> >> >>>>>>>>>>>>> > >> >> >>>>>>>>>>>>> : > >> >> >>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>> How would you define it (rough API is fine)?. > >> >> >>>>>>>>>>>>>>>>>> Without > >> >> >>>> more > >> >> >>>>>>>>> details, > >> >> >>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>> it is > >> >> >>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>> not easy to see wider applicability and > feasibility > >> >> >>>>>>>>>>>>>>>>>> in > >> >> >>>>>>> runners. > >> >> >>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>> On Thu, Nov 16, 2017 at 1:13 PM, Romain > Manni-Bucau > >> >> >>>>>>>>>>>>>>>>>> < > >> >> >>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>> [email protected]> > >> >> >>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>> wrote: > >> >> >>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>> This is a fair summary of the current state but > >> >> >>>>>>>>>>>>>>>>>>> also > >> >> >>>>> where > >> >> >>>>>>> beam > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> can > >> >> >>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>> have a > >> >> >>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>> very strong added value and make big data great > and > >> >> >>>>> smooth. > >> >> >>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>> Instead of this replay feature isnt checkpointing > >> >> >>>>> willable? > >> >> >>>>>>> In > >> >> >>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>> particular > >> >> >>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>> with SDF no? > >> >> >>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>> Le 16 nov. 2017 19:50, "Raghu Angadi" > >> >> >>>>>>>>> <[email protected]> > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> a > >> >> >>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>> écrit : > >> >> >>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>> Core issue here is that there is no explicit > >> >> >>>>>>>>>>>>>>>>>>>> concept > >> >> >>>> of > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> 'checkpoint' > >> >> >>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>> in > >> >> >>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>> Beam (UnboundedSource has a method > >> >> >>>>>>>>>>>>>>>>>>>> 'getCheckpointMark' > >> >> >>>>> but > >> >> >>>>>>> that > >> >> >>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>> refers to > >> >> >>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>> the checkoint on external source). Runners do > >> >> >>>> checkpoint > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> internally > >> >> >>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>> as > >> >> >>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>> implementation detail. Flink's checkpoint model > is > >> >> >>>>>> entirely > >> >> >>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>> different > >> >> >>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>> from > >> >> >>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>> Dataflow's and Spark's. > >> >> >>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>> @StableReplay helps, but it does not explicitly > >> >> >>>>>>>>>>>>>>>>>>>> talk > >> >> >>>>> about > >> >> >>>>>> a > >> >> >>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>> checkpoint > >> >> >>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>> by > >> >> >>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>> design. > >> >> >>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>> If you are looking to achieve some guarantees > with > >> >> >>>>>>>>>>>>>>>>>>>> a > >> >> >>>>>>>>> sink/DoFn, I > >> >> >>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>> think > >> >> >>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>> it > >> >> >>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>> is better to start with the requirements. I > worked > >> >> >>>>>>>>>>>>>>>>>>>> on > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> exactly-once > >> >> >>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>> sink > >> >> >>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>> for > >> >> >>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>> Kafka (see KafkaIO.write().withEOS()), where we > >> >> >>>>>> essentially > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> reshard > >> >> >>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>> the > >> >> >>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>> elements and assign sequence numbers to elements > >> >> >>>>>>>>>>>>>>>>>>>> with > >> >> >>>> in > >> >> >>>>>>> each > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> shard. > >> >> >>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>> Duplicates in replays are avoided based on these > >> >> >>>>> sequence > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> numbers. > >> >> >>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>> DoFn > >> >> >>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>> state API is used to buffer out-of order > replays. > >> >> >>>>>>>>>>>>>>>>>>>> The > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> implementation > >> >> >>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>> strategy works in Dataflow but not in Flink > which > >> >> >>>>>>>>>>>>>>>>>>>> has > >> >> >>>> a > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> horizontal > >> >> >>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>> checkpoint. KafkaIO checks for compatibility. > >> >> >>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>> On Wed, Nov 15, 2017 at 12:38 AM, Romain > >> >> >>>>>>>>>>>>>>>>>>>> Manni-Bucau > >> >> >>>>>>>>>>>>>>>>>>>> < > >> >> >>>>>>>>>>>>>>>>>>>> [email protected]> > >> >> > >> >> >>>>>>>>>>>>>>>>>>>> wrote: > >> >> >>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>>> Hi guys, > >> >> >>>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>>> The subject is a bit provocative but the topic > is > >> >> >>>> real > >> >> >>>>>> and > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> coming > >> >> >>>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>>> again and again with the beam usage: how a dofn > >> >> >>>>>>>>>>>>>>>>>>>>> can > >> >> >>>>>> handle > >> >> >>>>>>>>> some > >> >> >>>>>>>>>>>>>>>>>>>>> "chunking". > >> >> >>>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>>> The need is to be able to commit each N records > >> >> >>>>>>>>>>>>>>>>>>>>> but > >> >> >>>>> with > >> >> >>>>>> N > >> >> >>>>>>> not > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> too > >> >> >>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>> big. > >> >> >>>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>>> The natural API for that in beam is the bundle > >> >> >>>>>>>>>>>>>>>>>>>>> one > >> >> >>>> but > >> >> >>>>>>> bundles > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> are > >> >> >>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>> not > >> >> >>>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>>> reliable since they can be very small (flink) - > >> >> >>>>>>>>>>>>>>>>>>>>> we > >> >> >>>> can > >> >> >>>>>> say > >> >> >>>>>>> it > >> >> >>>>>>>>> is > >> >> >>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>> "ok" > >> >> >>>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>>> even if it has some perf impacts - or too big > >> >> >>>>>>>>>>>>>>>>>>>>> (spark > >> >> >>>>> does > >> >> >>>>>>> full > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> size > >> >> >>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>> / > >> >> >>>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>>> #workers). > >> >> >>>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>>> The workaround is what we see in the ES I/O: a > >> >> >>>> maxSize > >> >> >>>>>>> which > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> does > >> >> >>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>> an > >> >> >>>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>>> eager flush. The issue is that then the > >> >> >>>>>>>>>>>>>>>>>>>>> checkpoint > >> >> >>>>>>>>>>>>>>>>>>>>> is > >> >> >>>>> not > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>>>>> respected > >> >> >>>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>>> and you can process multiple times the same > >> >> >>>>>>>>>>>>>>>>>>>>> records. > >> >> >>>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>>> Any plan to make this API reliable and > >> >> >>>>>>>>>>>>>>>>>>>>> controllable > >> >> >>>>> from > >> >> >>>>>> a > >> >> >>>>>>>>> beam > >> >> >>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>> point > >> >> >>>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>>> of view (at least in a max manner)? > >> >> >>>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>>> Thanks, > >> >> >>>>>>>>>>>>>>>>>>>>> Romain Manni-Bucau > >> >> >>>>>>>>>>>>>>>>>>>>> @rmannibucau | Blog | Old Blog | Github | > >> >> >>>>>>>>>>>>>>>>>>>>> LinkedIn > >> >> >>>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>> > >> >> >>>>>>>>>>>>>> -- > >> >> >>>>>>>>>>>>>> Jean-Baptiste Onofré > >> >> >>>>>>>>>>>>>> [email protected] > >> >> >>>>>>>>>>>>>> http://blog.nanthrax.net > >> >> >>>>>>>>>>>>>> Talend - http://www.talend.com > >> >> >>>>>>>>>>>>>> > >> >> >>>>>>>>>>>> > >> >> >>>>>>>>>>> > >> >> >>>>>>>>>> > >> >> >>>>>>>>>> -- > >> >> >>>>>>>>>> Jean-Baptiste Onofré > >> >> >>>>>>>>>> [email protected] > >> >> >>>>>>>>>> http://blog.nanthrax.net > >> >> >>>>>>>>>> Talend - http://www.talend.com > >> >> >>>>>>>>> > >> >> >>>>>>> > >> >> >>>>>> > >> >> >>>>> > >> >> >>>> > >> >> > >> >> -- > >> >> Jean-Baptiste Onofré > >> >> [email protected] > >> >> http://blog.nanthrax.net > >> >> Talend - http://www.talend.com > >> > > > >
