So is your main concern potential poor performance on runners that choose to use a very small bundle size? (Currently an IO can trivially handle too large bundles, simply by flushing when enough data accumulates, which is what all IOs do - but indeed working around having unreasonably small bundles is much harder)
If so, I think, rather than making a model change, we should understand why those runners are choosing such a small bundle size, and potentially fix them. On Thu, Nov 30, 2017, 11:01 AM Romain Manni-Bucau <[email protected]> wrote: > > > Le 30 nov. 2017 19:23, "Kenneth Knowles" <[email protected]> a écrit : > > On Thu, Nov 30, 2017 at 10:03 AM, Romain Manni-Bucau < > [email protected]> wrote: > >> Hmm, >> >> ESIO: >> https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L847 >> JDBCIO: >> https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L592 >> MongoIO: >> https://github.com/apache/beam/blob/master/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java#L657 >> etc... >> >> They all use the same pattern. >> > > This is actually correct - if you have some triggers set up to yield some > low latency, then things need to actually be flushed here. If the runner > provides a one-element bundle it could be because data volume has dipped. > In this case, you pay per element instead of getting good amortization, but > since data volume is low this is not so bad and anyhow the only way to > yield the desired latency. > > Romain - just to echo some others, did you have a particular combination > of runner + IO that you wanted to target for improvement? That would focus > the discussion and we could think about what to change in the runner or IO > or discover an issue that they cannot solve. > > > I want to ensure EsIO will never do a flush of 1 element on any runner > without a timertrigger - assuming data volume is continuous and with a size > > 1. > > Really rephrased, my concern is that bundle which is a great > infra/environment feedback is today owned by beam code which defeats that > great purpose since beam doesnt use the impl for that. This notion should > be unified (size + timeout are often the defaults to trigger an "end" and > would work) accross runners or it should be hidden from the transform > developers IMHO. > > Note the low latency point is not linked to bundle size but, as you > mentionned, triggers (timeout or event based) which means both worlds can > work together in harmony and outcome to a valid bundle api (without any > change, yeah) and exposure to the user. > > > > Kenn > > > >> >> From what you wrote - and technically I agree but in current state my >> point is valid I think - you should drop bundle from the whole user >> API and make it all @Internal, no? >> >> >> Romain Manni-Bucau >> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >> >> 2017-11-30 18:58 GMT+01:00 Jean-Baptiste Onofré <[email protected]>: >> > Agree, but maybe we can inform the runner if wanted no ? >> > >> > Honestly, from my side, I'm fine with the current situation as it's >> runner >> > specific. >> > >> > Regards >> > JB >> > >> > On 11/30/2017 06:12 PM, Reuven Lax wrote: >> >> >> >> I don't think it belongs in PIpelineOptions, as bundle size is always a >> >> runner thing. >> >> >> >> We could consider adding a new generic RunnerOptions, however I'm not >> >> convinced all runners can actually support this. >> >> >> >> On Thu, Nov 30, 2017 at 6:09 AM, Romain Manni-Bucau < >> [email protected] >> >> <mailto:[email protected]>> wrote: >> >> >> >> Guys, >> >> >> >> what about moving getMaxBundleSize from flink options to pipeline >> >> options. I think all runners can support it right? Spark code needs >> >> the merge of the v2 before being able to be implemented probably >> but I >> >> don't see any blocker. >> >> >> >> wdyt? >> >> >> >> Romain Manni-Bucau >> >> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >> >> >> >> >> 2017-11-19 8:19 GMT+01:00 Romain Manni-Bucau < >> [email protected] >> >> <mailto:[email protected]>>: >> >> >> >> > @Eugene: "workaround" as specific to the IO each time and >> therefore >> >> > still highlight a lack in the core. >> >> > >> >> > Other comments inline >> >> > >> >> > >> >> > 2017-11-19 7:40 GMT+01:00 Robert Bradshaw >> >> <[email protected]>: >> >> >> There is a possible fourth issue that we don't handle well: >> >> efficiency. For >> >> >> very large bundles, it may be advantageous to avoid replaying a >> >> bunch of >> >> >> idempotent operations if there were a way to record what ones >> >> we're sure >> >> >> went through. Not sure if that's the issue here (though one >> could >> >> possibly >> >> >> do this with SDFs, one can preemptively returning periodically >> >> before an >> >> >> element (or portion thereof) is done). >> >> > >> >> > +1, also lead to the IO handling its own chunking/bundles and >> >> > therefore solves all issues at once. >> >> > >> >> >> >> >> >> On Sat, Nov 18, 2017 at 6:58 PM, Eugene Kirpichov < >> >> >> [email protected]> wrote: >> >> >> >> >> >>> I disagree that the usage of document id in ES is a >> "workaround" >> >> - it does >> >> >>> not address any *accidental *complexity >> >> >>> <https://en.wikipedia.org/wiki/No_Silver_Bullet >> >> <https://en.wikipedia.org/wiki/No_Silver_Bullet>> coming from >> >> shortcomings >> >> >>> of Beam, it addresses the *essential* complexity that a >> >> distributed system >> >> >>> forces one to take it as a fact of nature that the same write >> >> >>> (mutation) will happen multiple times, so if you want a >> mutation >> >> to happen >> >> >>> "as-if" it happened exactly once, the mutation itself must be >> >> idempotent >> >> >>> <https://en.wikipedia.org/wiki/Idempotence >> >> <https://en.wikipedia.org/wiki/Idempotence>>. Insert-with-id >> (upsert >> >> >>> <https://en.wikipedia.org/wiki/Merge_(SQL) >> >> <https://en.wikipedia.org/wiki/Merge_(SQL)>>) is a classic >> example of >> >> an >> >> >>> idempotent mutation, and it's very good that Elasticsearch >> >> provides it - if >> >> >>> it didn't, no matter how good of an API Beam had, achieving >> >> exactly-once >> >> >>> writes would be theoretically impossible. Are we in agreement >> on >> >> this so >> >> >>> far? >> >> >>> >> >> >>> Next: you seem to be discussing 3 issues together, all of >> which >> >> are valid >> >> >>> issues, but they seem unrelated to me: >> >> >>> 1. Exactly-once mutation >> >> >>> 2. Batching multiple mutations into one RPC. >> >> >>> 3. Backpressure >> >> >>> >> >> >>> #1: was considered above. The system the IO is talking to has >> to >> >> support >> >> >>> idempotent mutations, in an IO-specific way, and the IO has to >> >> take >> >> >>> advantage of them, in the IO-specific way - end of story. >> >> > >> >> > Agree but don't forget the original point was about "chunks" and >> >> not >> >> > individual records. >> >> > >> >> >>> >> >> >>> #2: a batch of idempotent operations is also idempotent, so >> this >> >> doesn't >> >> >>> add anything new semantically. Syntactically - Beam already >> >> allows you to >> >> >>> write your own batching by notifying you of permitted batch >> >> boundaries >> >> >>> (Start/FinishBundle). Sure, it could do more, but from my >> >> experience the >> >> >>> batching in IOs I've seen is one of the easiest and least >> >> error-prone >> >> >>> parts, so I don't see something worth an extended discussion >> >> here. >> >> > >> >> > "Beam already allows you to >> >> > write your own batching by notifying you of permitted batch >> >> boundaries >> >> > (Start/FinishBundle)" >> >> > >> >> > Is wrong since the bundle is potentially the whole PCollection >> >> (spark) >> >> > so this is not even an option until you use the SDF (back to the >> >> same >> >> > point). >> >> > Once again the API looks fine but no implementation makes it >> true. >> >> It >> >> > would be easy to change it in spark, flink can be ok since it >> >> targets >> >> > more the streaming case, not sure of others, any idea? >> >> > >> >> > >> >> >>> >> >> >>> #3: handling backpressure is a complex problem with multiple >> >> facets: 1) how >> >> >>> do you know you're being throttled, and by how much are you >> >> exceeding the >> >> >>> external system's capacity? >> >> > >> >> > This is the whole point of backpressure, the system sends it >> back >> >> to >> >> > you (header like or status technic in general) >> >> > >> >> >>> 2) how do you communicate this signal to the >> >> >>> runner? >> >> > >> >> > You are a client so you get the meta in the response - whatever >> >> techno. >> >> > >> >> >>> 3) what does the runner do in response? >> >> > >> >> > Runner nothing but the IO adapts its handling as mentionned >> before >> >> > (wait and retry, skip, ... depending the config) >> >> > >> >> >>> 4) how do you wait until >> >> >>> it's ok to try again? >> >> > >> >> > This is one point to probably enhance in beam but waiting in the >> >> > processing is an option if the source has some buffering >> otherwise >> >> it >> >> > requires to have a buffer fallback and max size if the wait >> mode is >> >> > activated. >> >> > >> >> >>> You seem to be advocating for solving one facet of this >> problem, >> >> which is: >> >> >>> you want it to be possible to signal to the runner "I'm being >> >> throttled, >> >> >>> please end the bundle", right? If so - I think this (ending >> the >> >> bundle) is >> >> >>> unnecessary: the DoFn can simply do an exponential back-off >> sleep >> >> loop. >> >> > >> >> > Agree, never said the runner should know but GBK+output doesnt >> work >> >> > cause you dont own the GBK. >> >> > >> >> >>> This is e.g. what DatastoreIO does >> >> >>> <https://github.com/apache/beam/blob/master/sdks/java/io/ >> >> <https://github.com/apache/beam/blob/master/sdks/java/io/> >> >> >>> google-cloud-platform/src/main/java/org/apache/beam/sdk/ >> >> >>> io/gcp/datastore/DatastoreV1.java#L1318> >> >> >>> and >> >> >>> this is in general how most systems I've seen handle >> >> backpressure. Is there >> >> >>> something I'm missing? In particular, is there any compelling >> >> reason why >> >> >>> you think it'd be beneficial e.g. for DatastoreIO to commit >> the >> >> results of >> >> >>> the bundle so far before processing other elements? >> >> > >> >> > It was more about ensuring you validate early a subset of the >> whole >> >> > bundle and avoid to reprocess it if it fails later. >> >> > >> >> > >> >> > So to summarize I see 2 outcomes: >> >> > >> >> > 1. impl SDF in all runners >> >> > 2. make the bundle size upper bounded - through a pipeline >> option - >> >> in >> >> > all runners, not sure this one is doable everywhere since I >> mainly >> >> > checked spark case >> >> > >> >> >>> >> >> >>> Again, it might be that I'm still misunderstanding what you're >> >> trying to >> >> >>> say. One of the things it would help to clarify would be - >> >> exactly what do >> >> >>> you mean by "how batch frameworks solved that for years": can >> you >> >> point at >> >> >>> an existing API in some other framework that achieves what you >> >> want? >> >> >>> >> >> >>> On Sat, Nov 18, 2017 at 1:02 PM Romain Manni-Bucau >> >> <[email protected] <mailto:[email protected]>> >> >> >> >> >>> wrote: >> >> >>> >> >> >>> > Eugene, point - and issue with a single sample - is you can >> >> always find >> >> >>> > *workarounds* on a case by case basis as the id one with ES >> but >> >> beam >> >> >>> doesnt >> >> >>> > solve the problem as a framework. >> >> >>> > >> >> >>> > From my past, I clearly dont see how batch frameworks solved >> >> that for >> >> >>> years >> >> >>> > and beam is not able to do it - keep in mind it is the same >> >> kind of >> >> >>> techno, >> >> >>> > it just uses different sources and bigger clusters so no >> real >> >> reason to >> >> >>> not >> >> >>> > have the same feature quality. The only potential reason i >> see >> >> is there >> >> >>> is >> >> >>> > no tracking of the state into the cluster - e2e. But i dont >> see >> >> why there >> >> >>> > wouldnt be. Do I miss something here? >> >> >>> > >> >> >>> > An example could be: take a github crawler computing stats >> on >> >> the whole >> >> >>> > girhub repos which is based on a rest client as example. You >> >> will need to >> >> >>> > handle the rate limit and likely want to "commit" each time >> you >> >> reach a >> >> >>> > rate limit with likely some buffering strategy with a max >> size >> >> before >> >> >>> > really waiting. How do you do it with a GBK independent of >> your >> >> dofn? You >> >> >>> > are not able to compose correctly the fn between them :(. >> >> >>> > >> >> >>> > >> >> >>> > Le 18 nov. 2017 20:48, "Eugene Kirpichov" >> >> <[email protected]> >> >> >>> a >> >> >>> > écrit : >> >> >>> > >> >> >>> > After giving this thread my best attempt at understanding >> >> exactly what is >> >> >>> > the problem and the proposed solution, I'm afraid I still >> fail >> >> to >> >> >>> > understand both. To reiterate, I think the only way to make >> >> progress here >> >> >>> > is to be more concrete: (quote) take some IO that you think >> >> could be >> >> >>> easier >> >> >>> > to write with your proposed API, give the contents of a >> >> hypothetical >> >> >>> > PCollection being written to this IO, give the code of a >> >> hypothetical >> >> >>> DoFn >> >> >>> > implementing the write using your API, and explain what >> you'd >> >> expect to >> >> >>> > happen at runtime. I'm going to re-engage in this thread >> after >> >> such an >> >> >>> > example is given. >> >> >>> > >> >> >>> > On Sat, Nov 18, 2017, 5:00 AM Romain Manni-Bucau >> >> <[email protected] <mailto:[email protected]>> >> >> >> >> >>> > wrote: >> >> >>> > >> >> >>> > > First bundle retry is unusable with dome runners like >> spark >> >> where the >> >> >>> > > bundle size is the collection size / number of work. This >> >> means a user >> >> >>> > cant >> >> >>> > > use bundle API or feature reliably and portably - which is >> >> beam >> >> >>> promise. >> >> >>> > > Aligning chunking and bundles would guarantee that bit >> can be >> >> not >> >> >>> > desired, >> >> >>> > > that is why i thought it can be another feature. >> >> >>> > > >> >> >>> > > GBK works until the IO knows about that and both concepts >> are >> >> not >> >> >>> always >> >> >>> > > orthogonal - backpressure like systems is a trivial common >> >> example. >> >> >>> This >> >> >>> > > means the IO (dofn) must be able to do it itself at some >> >> point. >> >> >>> > > >> >> >>> > > Also note the GBK works only if the IO can take a list >> which >> >> is never >> >> >>> the >> >> >>> > > case today. >> >> >>> > > >> >> >>> > > Big questions for me are: is SDF the way to go since it >> >> provides the >> >> >>> > needed >> >> >>> > > API bit is not yet supported? What about existing IO? >> Should >> >> beam >> >> >>> provide >> >> >>> > > an auto wrapping of dofn for that pre-aggregated support >> and >> >> simulate >> >> >>> > > bundles to the actual IO impl to keep the existing API? >> >> >>> > > >> >> >>> > > >> >> >>> > > Le 17 nov. 2017 19:20, "Raghu Angadi" >> >> <[email protected]> a >> >> >>> > > écrit : >> >> >>> > > >> >> >>> > > On Fri, Nov 17, 2017 at 1:02 AM, Romain Manni-Bucau < >> >> >>> > [email protected] <mailto:[email protected]> >> >> >> >> >>> > > > >> >> >>> > > wrote: >> >> >>> > > >> >> >>> > > > Yep, just take ES IO, if a part of a bundle fails you >> are >> >> in an >> >> >>> > > > unmanaged state. This is the case for all O (of IO ;)). >> >> Issue is not >> >> >>> > > > much about "1" (the code it takes) but more the fact it >> >> doesn't >> >> >>> > > > integrate with runner features and retries potentially: >> >> what happens >> >> >>> > > > if a bundle has a failure? => undefined today. 2. I'm >> fine >> >> with it >> >> >>> > > > while we know exactly what happens when we restart >> after a >> >> bundle >> >> >>> > > > failure. With ES the timestamp can be used for instance. >> >> >>> > > > >> >> >>> > > >> >> >>> > > This deterministic batching can be achieved even now with >> an >> >> extra >> >> >>> > > GroupByKey (and if you want ordering on top of that, will >> >> need another >> >> >>> > > GBK). Don't know if that is too costly in your case. I >> would >> >> need bit >> >> >>> > more >> >> >>> > > details on handling ES IO write retries to see it could be >> >> simplified. >> >> >>> > Note >> >> >>> > > that retries occur with or without any failures in your >> DoFn. >> >> >>> > > >> >> >>> > > The biggest negative with GBK approach is that it doesn't >> >> provide same >> >> >>> > > guarantees on Flink. >> >> >>> > > >> >> >>> > > I don't see how GroubIntoBatches in Beam provides specific >> >> guarantees >> >> >>> on >> >> >>> > > deterministic batches. >> >> >>> > > >> >> >>> > > Thinking about it the SDF is really a way to do it since >> the >> >> SDF will >> >> >>> > > > manage the bulking and associated with the runner >> "retry" >> >> it seems it >> >> >>> > > > covers the needs. >> >> >>> > > > >> >> >>> > > > Romain Manni-Bucau >> >> >>> > > > @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >> >>> > > > >> >> >>> > > > >> >> >>> > > > 2017-11-17 9:23 GMT+01:00 Eugene Kirpichov >> >> >>> > <[email protected] >> >> >>> > > >: >> >> >>> > > > > I must admit I'm still failing to understand the >> problem, >> >> so let's >> >> >>> > step >> >> >>> > > > > back even further. >> >> >>> > > > > >> >> >>> > > > > Could you give an example of an IO that is currently >> >> difficult to >> >> >>> > > > implement >> >> >>> > > > > specifically because of lack of the feature you're >> >> talking about? >> >> >>> > > > > >> >> >>> > > > > I'm asking because I've reviewed almost all Beam IOs >> and >> >> don't >> >> >>> recall >> >> >>> > > > > seeing a similar problem. Sure, a lot of IOs do >> batching >> >> within a >> >> >>> > > bundle, >> >> >>> > > > > but 1) it doesn't take up much code (granted, it >> would be >> >> even >> >> >>> easier >> >> >>> > > if >> >> >>> > > > > Beam did it for us) and 2) I don't remember any of >> them >> >> requiring >> >> >>> the >> >> >>> > > > > batches to be deterministic, and I'm having a hard >> time >> >> imagining >> >> >>> > what >> >> >>> > > > kind >> >> >>> > > > > of storage system would be able to deduplicate if >> batches >> >> were >> >> >>> > > > > deterministic but wouldn't be able to deduplicate if >> they >> >> weren't. >> >> >>> > > > > >> >> >>> > > > > On Thu, Nov 16, 2017 at 11:50 PM Romain Manni-Bucau < >> >> >>> > > > [email protected] <mailto:[email protected]>> >> >> >> >> >>> > > > > wrote: >> >> >>> > > > > >> >> >>> > > > >> Ok, let me try to step back and summarize what we >> have >> >> today and >> >> >>> > what >> >> >>> > > I >> >> >>> > > > >> miss: >> >> >>> > > > >> >> >> >>> > > > >> 1. we can handle chunking in beam through group in >> batch >> >> (or >> >> >>> > > equivalent) >> >> >>> > > > >> but: >> >> >>> > > > >> > it is not built-in into the transforms (IO) and >> it >> >> is >> >> >>> > controlled >> >> >>> > > > >> from outside the transforms so no way for a >> transform to >> >> do it >> >> >>> > > > >> properly without handling itself a composition and >> links >> >> between >> >> >>> > > > >> multiple dofns to have notifications and potentially >> >> react >> >> >>> properly >> >> >>> > or >> >> >>> > > > >> handle backpressure from its backend >> >> >>> > > > >> 2. there is no restart feature because there is no >> real >> >> state >> >> >>> > handling >> >> >>> > > > >> at the moment. this sounds fully delegated to the >> runner >> >> but I was >> >> >>> > > > >> hoping to have more guarantees from the used API to >> be >> >> able to >> >> >>> > restart >> >> >>> > > > >> a pipeline (mainly batch since it can be irrelevant >> or >> >> delegates >> >> >>> to >> >> >>> > > > >> the backend for streams) and handle only not commited >> >> records so >> >> >>> it >> >> >>> > > > >> requires some persistence outside the main IO >> storages >> >> to do it >> >> >>> > > > >> properly >> >> >>> > > > >> > note this is somehow similar to the monitoring >> >> topic which >> >> >>> miss >> >> >>> > > > >> persistence ATM so it can end up to beam to have a >> >> pluggable >> >> >>> storage >> >> >>> > > > >> for a few concerns >> >> >>> > > > >> >> >> >>> > > > >> >> >> >>> > > > >> Short term I would be happy with 1 solved properly, >> long >> >> term I >> >> >>> hope >> >> >>> > 2 >> >> >>> > > > >> will be tackled without workarounds requiring custom >> >> wrapping of >> >> >>> IO >> >> >>> > to >> >> >>> > > > >> use a custom state persistence. >> >> >>> > > > >> >> >> >>> > > > >> >> >> >>> > > > >> >> >> >>> > > > >> Romain Manni-Bucau >> >> >>> > > > >> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >> >>> > > > >> >> >> >>> > > > >> >> >> >>> > > > >> 2017-11-17 7:44 GMT+01:00 Jean-Baptiste Onofré >> >> <[email protected] <mailto:[email protected]>>: >> >> >> >> >>> > > > >> > Thanks for the explanation. Agree, we might talk >> about >> >> different >> >> >>> > > > things >> >> >>> > > > >> > using the same wording. >> >> >>> > > > >> > >> >> >>> > > > >> > I'm also struggling to understand the use case >> (for a >> >> generic >> >> >>> > DoFn). >> >> >>> > > > >> > >> >> >>> > > > >> > Regards >> >> >>> > > > >> > JB >> >> >>> > > > >> > >> >> >>> > > > >> > >> >> >>> > > > >> > On 11/17/2017 07:40 AM, Eugene Kirpichov wrote: >> >> >>> > > > >> >> >> >> >>> > > > >> >> To avoid spending a lot of time pursuing a false >> >> path, I'd like >> >> >>> > to >> >> >>> > > > say >> >> >>> > > > >> >> straight up that SDF is definitely not going to >> help >> >> here, >> >> >>> > despite >> >> >>> > > > the >> >> >>> > > > >> >> fact >> >> >>> > > > >> >> that its API includes the term "checkpoint". In >> SDF, >> >> the >> >> >>> > > "checkpoint" >> >> >>> > > > >> >> captures the state of processing within a single >> >> element. If >> >> >>> > you're >> >> >>> > > > >> >> applying an SDF to 1000 elements, it will, like >> any >> >> other DoFn, >> >> >>> > be >> >> >>> > > > >> applied >> >> >>> > > > >> >> to each of them independently and in parallel, and >> >> you'll have >> >> >>> > 1000 >> >> >>> > > > >> >> checkpoints capturing the state of processing >> each of >> >> these >> >> >>> > > elements, >> >> >>> > > > >> >> which >> >> >>> > > > >> >> is probably not what you want. >> >> >>> > > > >> >> >> >> >>> > > > >> >> I'm afraid I still don't understand what kind of >> >> checkpoint you >> >> >>> > > > need, if >> >> >>> > > > >> >> it >> >> >>> > > > >> >> is not just deterministic grouping into batches. >> >> "Checkpoint" >> >> >>> is >> >> >>> > a >> >> >>> > > > very >> >> >>> > > > >> >> broad term and it's very possible that everybody >> in >> >> this thread >> >> >>> > is >> >> >>> > > > >> talking >> >> >>> > > > >> >> about different things when saying it. So it would >> >> help if you >> >> >>> > > could >> >> >>> > > > >> give >> >> >>> > > > >> >> a >> >> >>> > > > >> >> more concrete example: for example, take some IO >> that >> >> you think >> >> >>> > > > could be >> >> >>> > > > >> >> easier to write with your proposed API, give the >> >> contents of a >> >> >>> > > > >> >> hypothetical >> >> >>> > > > >> >> PCollection being written to this IO, give the >> code >> >> of a >> >> >>> > > hypothetical >> >> >>> > > > >> DoFn >> >> >>> > > > >> >> implementing the write using your API, and explain >> >> what you'd >> >> >>> > > expect >> >> >>> > > > to >> >> >>> > > > >> >> happen at runtime. >> >> >>> > > > >> >> >> >> >>> > > > >> >> On Thu, Nov 16, 2017 at 10:33 PM Romain >> Manni-Bucau >> >> >>> > > > >> >> <[email protected] >> >> <mailto:[email protected]>> >> >> >> >> >>> > > > >> >> wrote: >> >> >>> > > > >> >> >> >> >>> > > > >> >>> @Eugene: yes and the other alternative of Reuven >> too >> >> but it is >> >> >>> > > still >> >> >>> > > > >> >>> 1. relying on timers, 2. not really checkpointed >> >> >>> > > > >> >>> >> >> >>> > > > >> >>> In other words it seems all solutions are to >> create >> >> a chunk of >> >> >>> > > size >> >> >>> > > > 1 >> >> >>> > > > >> >>> and replayable to fake the lack of chunking in >> the >> >> framework. >> >> >>> > This >> >> >>> > > > >> >>> always implies a chunk handling outside the >> >> component >> >> >>> (typically >> >> >>> > > > >> >>> before for an output). My point is I think IO >> need >> >> it in their >> >> >>> > own >> >> >>> > > > >> >>> "internal" or at least control it themselves >> since >> >> the chunk >> >> >>> > size >> >> >>> > > is >> >> >>> > > > >> >>> part of the IO handling most of the time. >> >> >>> > > > >> >>> >> >> >>> > > > >> >>> I think JB spoke of the same "group before" trick >> >> using >> >> >>> > > restrictions >> >> >>> > > > >> >>> which can work I have to admit if SDF are >> >> implemented by >> >> >>> > runners. >> >> >>> > > Is >> >> >>> > > > >> >>> there a roadmap/status on that? Last time I >> checked >> >> SDF was a >> >> >>> > > great >> >> >>> > > > >> >>> API without support :(. >> >> >>> > > > >> >>> >> >> >>> > > > >> >>> >> >> >>> > > > >> >>> >> >> >>> > > > >> >>> Romain Manni-Bucau >> >> >>> > > > >> >>> @rmannibucau | Blog | Old Blog | Github | >> LinkedIn >> >> >>> > > > >> >>> >> >> >>> > > > >> >>> >> >> >>> > > > >> >>> 2017-11-17 7:25 GMT+01:00 Eugene Kirpichov >> >> >>> > > > >> >>> <[email protected]>: >> >> >>> > > > >> >>>> >> >> >>> > > > >> >>>> JB, not sure what you mean? SDFs and triggers >> are >> >> unrelated, >> >> >>> > and >> >> >>> > > > the >> >> >>> > > > >> >>>> post >> >> >>> > > > >> >>>> doesn't mention the word. Did you mean something >> >> else, e.g. >> >> >>> > > > >> restriction >> >> >>> > > > >> >>>> perhaps? Either way I don't think SDFs are the >> >> solution here; >> >> >>> > > SDFs >> >> >>> > > > >> have >> >> >>> > > > >> >>> >> >> >>> > > > >> >>> to >> >> >>> > > > >> >>>> >> >> >>> > > > >> >>>> do with the ability to split the processing of >> *a >> >> single >> >> >>> > element* >> >> >>> > > > over >> >> >>> > > > >> >>>> multiple calls, whereas Romain I think is asking >> >> for >> >> >>> repeatable >> >> >>> > > > >> grouping >> >> >>> > > > >> >>> >> >> >>> > > > >> >>> of >> >> >>> > > > >> >>>> >> >> >>> > > > >> >>>> *multiple* elements. >> >> >>> > > > >> >>>> >> >> >>> > > > >> >>>> Romain - does >> >> >>> > > > >> >>>> >> >> >>> > > > >> >>> >> >> >>> > > > >> >>> >> >> >>> > > > >> >> https://github.com/apache/beam/blob/master/sdks/java/ >> >> <https://github.com/apache/beam/blob/master/sdks/java/> >> >> >>> > > > core/src/main/java/org/apache/beam/sdk/transforms/ >> >> >>> GroupIntoBatches.java >> >> >>> > > > >> >>>> >> >> >>> > > > >> >>>> do what >> >> >>> > > > >> >>>> you want? >> >> >>> > > > >> >>>> >> >> >>> > > > >> >>>> On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste >> >> Onofré < >> >> >>> > > > >> [email protected] <mailto:[email protected]>> >> >> >>> > > > >> >>>> wrote: >> >> >>> > > > >> >>>> >> >> >>> > > > >> >>>>> It sounds like the "Trigger" in the Splittable >> >> DoFn, no ? >> >> >>> > > > >> >>>>> >> >> >>> > > > >> >>>>> >> >> https://beam.apache.org/blog/2017/08/16/splittable-do-fn >> >> <https://beam.apache.org/blog/2017/08/16/splittable-do-fn>. >> >> >> >> >>> html >> >> >>> > > > >> >>>>> >> >> >>> > > > >> >>>>> Regards >> >> >>> > > > >> >>>>> JB >> >> >>> > > > >> >>>>> >> >> >>> > > > >> >>>>> >> >> >>> > > > >> >>>>> On 11/17/2017 06:56 AM, Romain Manni-Bucau >> wrote: >> >> >>> > > > >> >>>>>> >> >> >>> > > > >> >>>>>> it gives the fn/transform the ability to save >> a >> >> state - it >> >> >>> > can >> >> >>> > > > get >> >> >>> > > > >> >>>>>> back on "restart" / whatever unit we can use, >> >> probably >> >> >>> runner >> >> >>> > > > >> >>>>>> dependent? Without that you need to rewrite >> all >> >> IO usage >> >> >>> with >> >> >>> > > > >> >>>>>> something like the previous pattern which >> makes >> >> the IO not >> >> >>> > self >> >> >>> > > > >> >>>>>> sufficient and kind of makes the entry cost >> and >> >> usage of >> >> >>> beam >> >> >>> > > way >> >> >>> > > > >> >>>>>> further. >> >> >>> > > > >> >>>>>> >> >> >>> > > > >> >>>>>> In my mind it is exactly what >> jbatch/spring-batch >> >> uses but >> >> >>> > > > adapted >> >> >>> > > > >> to >> >> >>> > > > >> >>>>>> beam (stream in particular) case. >> >> >>> > > > >> >>>>>> >> >> >>> > > > >> >>>>>> Romain Manni-Bucau >> >> >>> > > > >> >>>>>> @rmannibucau | Blog | Old Blog | Github | >> >> LinkedIn >> >> >>> > > > >> >>>>>> >> >> >>> > > > >> >>>>>> >> >> >>> > > > >> >>>>>> 2017-11-17 6:49 GMT+01:00 Reuven Lax >> >> >>> > <[email protected] >> >> >>> > > >: >> >> >>> > > > >> >>>>>>> >> >> >>> > > > >> >>>>>>> Romain, >> >> >>> > > > >> >>>>>>> >> >> >>> > > > >> >>>>>>> Can you define what you mean by checkpoint? >> What >> >> are the >> >> >>> > > > semantics, >> >> >>> > > > >> >>> >> >> >>> > > > >> >>> what >> >> >>> > > > >> >>>>>>> >> >> >>> > > > >> >>>>>>> does it accomplish? >> >> >>> > > > >> >>>>>>> >> >> >>> > > > >> >>>>>>> Reuven >> >> >>> > > > >> >>>>>>> >> >> >>> > > > >> >>>>>>> On Fri, Nov 17, 2017 at 1:40 PM, Romain >> >> Manni-Bucau < >> >> >>> > > > >> >>>>> >> >> >>> > > > >> >>>>> [email protected] >> >> <mailto:[email protected]>> >> >> >> >> >>> > > > >> >>>>>>> >> >> >>> > > > >> >>>>>>> wrote: >> >> >>> > > > >> >>>>>>> >> >> >>> > > > >> >>>>>>>> Yes, what I propose earlier was: >> >> >>> > > > >> >>>>>>>> >> >> >>> > > > >> >>>>>>>> I. checkpoint marker: >> >> >>> > > > >> >>>>>>>> >> >> >>> > > > >> >>>>>>>> @AnyBeamAnnotation >> >> >>> > > > >> >>>>>>>> @CheckpointAfter >> >> >>> > > > >> >>>>>>>> public void someHook(SomeContext ctx); >> >> >>> > > > >> >>>>>>>> >> >> >>> > > > >> >>>>>>>> >> >> >>> > > > >> >>>>>>>> II. pipeline.apply(ParDo.of(new >> >> >>> > > > >> MyFn()).withCheckpointAlgorithm(new >> >> >>> > > > >> >>>>>>>> CountingAlgo())) >> >> >>> > > > >> >>>>>>>> >> >> >>> > > > >> >>>>>>>> III. (I like this one less) >> >> >>> > > > >> >>>>>>>> >> >> >>> > > > >> >>>>>>>> // in the dofn >> >> >>> > > > >> >>>>>>>> @CheckpointTester >> >> >>> > > > >> >>>>>>>> public boolean shouldCheckpoint(); >> >> >>> > > > >> >>>>>>>> >> >> >>> > > > >> >>>>>>>> IV. @Checkpointer Serializable >> getCheckpoint(); >> >> in the >> >> >>> dofn >> >> >>> > > per >> >> >>> > > > >> >>> >> >> >>> > > > >> >>> element >> >> >>> > > > >> >>>>>>>> >> >> >>> > > > >> >>>>>>>> >> >> >>> > > > >> >>>>>>>> >> >> >>> > > > >> >>>>>>>> >> >> >>> > > > >> >>>>>>>> >> >> >>> > > > >> >>>>>>>> Romain Manni-Bucau >> >> >>> > > > >> >>>>>>>> @rmannibucau | Blog | Old Blog | Github | >> >> LinkedIn >> >> >>> > > > >> >>>>>>>> >> >> >>> > > > >> >>>>>>>> >> >> >>> > > > >> >>>>>>>> 2017-11-17 6:06 GMT+01:00 Raghu Angadi >> >> >>> > > > <[email protected] >> >> >>> > > > >> >>>> >> >> >>> > > > >> >>>> : >> >> >>> > > > >> >>>>>>>>> >> >> >>> > > > >> >>>>>>>>> How would you define it (rough API is >> fine)?. >> >> Without >> >> >>> more >> >> >>> > > > >> details, >> >> >>> > > > >> >>>>> >> >> >>> > > > >> >>>>> it is >> >> >>> > > > >> >>>>>>>>> >> >> >>> > > > >> >>>>>>>>> not easy to see wider applicability and >> >> feasibility in >> >> >>> > > > runners. >> >> >>> > > > >> >>>>>>>>> >> >> >>> > > > >> >>>>>>>>> On Thu, Nov 16, 2017 at 1:13 PM, Romain >> >> Manni-Bucau < >> >> >>> > > > >> >>>>>>>> >> >> >>> > > > >> >>>>>>>> [email protected] >> >> <mailto:[email protected]>> >> >> >> >> >>> > > > >> >>>>>>>>> >> >> >>> > > > >> >>>>>>>>> wrote: >> >> >>> > > > >> >>>>>>>>> >> >> >>> > > > >> >>>>>>>>>> This is a fair summary of the current >> state >> >> but also >> >> >>> > where >> >> >>> > > > beam >> >> >>> > > > >> >>> >> >> >>> > > > >> >>> can >> >> >>> > > > >> >>>>>>>> >> >> >>> > > > >> >>>>>>>> have a >> >> >>> > > > >> >>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>> very strong added value and make big data >> >> great and >> >> >>> > smooth. >> >> >>> > > > >> >>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>> Instead of this replay feature isnt >> >> checkpointing >> >> >>> > willable? >> >> >>> > > > In >> >> >>> > > > >> >>>>>>>> >> >> >>> > > > >> >>>>>>>> particular >> >> >>> > > > >> >>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>> with SDF no? >> >> >>> > > > >> >>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>> Le 16 nov. 2017 19:50, "Raghu Angadi" >> >> >>> > > > >> <[email protected]> >> >> >>> > > > >> >>> >> >> >>> > > > >> >>> a >> >> >>> > > > >> >>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>> écrit : >> >> >>> > > > >> >>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>> Core issue here is that there is no >> explicit >> >> concept >> >> >>> of >> >> >>> > > > >> >>> >> >> >>> > > > >> >>> 'checkpoint' >> >> >>> > > > >> >>>>>>>> >> >> >>> > > > >> >>>>>>>> in >> >> >>> > > > >> >>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>> Beam (UnboundedSource has a method >> >> 'getCheckpointMark' >> >> >>> > but >> >> >>> > > > that >> >> >>> > > > >> >>>>>>>> >> >> >>> > > > >> >>>>>>>> refers to >> >> >>> > > > >> >>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>> the checkoint on external source). >> Runners >> >> do >> >> >>> checkpoint >> >> >>> > > > >> >>> >> >> >>> > > > >> >>> internally >> >> >>> > > > >> >>>>> >> >> >>> > > > >> >>>>> as >> >> >>> > > > >> >>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>> implementation detail. Flink's checkpoint >> >> model is >> >> >>> > > entirely >> >> >>> > > > >> >>>>> >> >> >>> > > > >> >>>>> different >> >> >>> > > > >> >>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>> from >> >> >>> > > > >> >>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>> Dataflow's and Spark's. >> >> >>> > > > >> >>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>> @StableReplay helps, but it does not >> >> explicitly talk >> >> >>> > about >> >> >>> > > a >> >> >>> > > > >> >>>>>>>> >> >> >>> > > > >> >>>>>>>> checkpoint >> >> >>> > > > >> >>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>> by >> >> >>> > > > >> >>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>> design. >> >> >>> > > > >> >>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>> If you are looking to achieve some >> >> guarantees with a >> >> >>> > > > >> sink/DoFn, I >> >> >>> > > > >> >>>>>>>> >> >> >>> > > > >> >>>>>>>> think >> >> >>> > > > >> >>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>> it >> >> >>> > > > >> >>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>> is better to start with the >> requirements. I >> >> worked on >> >> >>> > > > >> >>> >> >> >>> > > > >> >>> exactly-once >> >> >>> > > > >> >>>>>>>> >> >> >>> > > > >> >>>>>>>> sink >> >> >>> > > > >> >>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>> for >> >> >>> > > > >> >>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>> Kafka (see KafkaIO.write().withEOS()), >> where >> >> we >> >> >>> > > essentially >> >> >>> > > > >> >>> >> >> >>> > > > >> >>> reshard >> >> >>> > > > >> >>>>>>>> >> >> >>> > > > >> >>>>>>>> the >> >> >>> > > > >> >>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>> elements and assign sequence numbers to >> >> elements with >> >> >>> in >> >> >>> > > > each >> >> >>> > > > >> >>> >> >> >>> > > > >> >>> shard. >> >> >>> > > > >> >>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>> Duplicates in replays are avoided based >> on >> >> these >> >> >>> > sequence >> >> >>> > > > >> >>> >> >> >>> > > > >> >>> numbers. >> >> >>> > > > >> >>>>>>>> >> >> >>> > > > >> >>>>>>>> DoFn >> >> >>> > > > >> >>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>> state API is used to buffer out-of order >> >> replays. The >> >> >>> > > > >> >>> >> >> >>> > > > >> >>> implementation >> >> >>> > > > >> >>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>> strategy works in Dataflow but not in >> Flink >> >> which has >> >> >>> a >> >> >>> > > > >> >>> >> >> >>> > > > >> >>> horizontal >> >> >>> > > > >> >>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>> checkpoint. KafkaIO checks for >> >> compatibility. >> >> >>> > > > >> >>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>> On Wed, Nov 15, 2017 at 12:38 AM, Romain >> >> Manni-Bucau < >> >> >>> > > > >> >>>>>>>>>>> [email protected] >> >> <mailto:[email protected]>> >> >> >> >> >>> > > > >> >>>>>>>>>>> wrote: >> >> >>> > > > >> >>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>>> Hi guys, >> >> >>> > > > >> >>>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>>> The subject is a bit provocative but the >> >> topic is >> >> >>> real >> >> >>> > > and >> >> >>> > > > >> >>> >> >> >>> > > > >> >>> coming >> >> >>> > > > >> >>>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>>> again and again with the beam usage: >> how a >> >> dofn can >> >> >>> > > handle >> >> >>> > > > >> some >> >> >>> > > > >> >>>>>>>>>>>> "chunking". >> >> >>> > > > >> >>>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>>> The need is to be able to commit each N >> >> records but >> >> >>> > with >> >> >>> > > N >> >> >>> > > > not >> >> >>> > > > >> >>> >> >> >>> > > > >> >>> too >> >> >>> > > > >> >>>>>>>> >> >> >>> > > > >> >>>>>>>> big. >> >> >>> > > > >> >>>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>>> The natural API for that in beam is the >> >> bundle one >> >> >>> but >> >> >>> > > > bundles >> >> >>> > > > >> >>> >> >> >>> > > > >> >>> are >> >> >>> > > > >> >>>>>>>> >> >> >>> > > > >> >>>>>>>> not >> >> >>> > > > >> >>>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>>> reliable since they can be very small >> >> (flink) - we >> >> >>> can >> >> >>> > > say >> >> >>> > > > it >> >> >>> > > > >> is >> >> >>> > > > >> >>>>>>>> >> >> >>> > > > >> >>>>>>>> "ok" >> >> >>> > > > >> >>>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>>> even if it has some perf impacts - or >> too >> >> big (spark >> >> >>> > does >> >> >>> > > > full >> >> >>> > > > >> >>> >> >> >>> > > > >> >>> size >> >> >>> > > > >> >>>>>>>> >> >> >>> > > > >> >>>>>>>> / >> >> >>> > > > >> >>>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>>> #workers). >> >> >>> > > > >> >>>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>>> The workaround is what we see in the ES >> >> I/O: a >> >> >>> maxSize >> >> >>> > > > which >> >> >>> > > > >> >>> >> >> >>> > > > >> >>> does >> >> >>> > > > >> >>>>> >> >> >>> > > > >> >>>>> an >> >> >>> > > > >> >>>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>>> eager flush. The issue is that then the >> >> checkpoint is >> >> >>> > not >> >> >>> > > > >> >>> >> >> >>> > > > >> >>> respected >> >> >>> > > > >> >>>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>>> and you can process multiple times the >> same >> >> records. >> >> >>> > > > >> >>>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>>> Any plan to make this API reliable and >> >> controllable >> >> >>> > from >> >> >>> > > a >> >> >>> > > > >> beam >> >> >>> > > > >> >>>>>>>> >> >> >>> > > > >> >>>>>>>> point >> >> >>> > > > >> >>>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>>> of view (at least in a max manner)? >> >> >>> > > > >> >>>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>>> Thanks, >> >> >>> > > > >> >>>>>>>>>>>> Romain Manni-Bucau >> >> >>> > > > >> >>>>>>>>>>>> @rmannibucau | Blog | Old Blog | >> Github | >> >> LinkedIn >> >> >>> > > > >> >>>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>>> >> >> >>> > > > >> >>>>>>>>>> >> >> >>> > > > >> >>>>>>>> >> >> >>> > > > >> >>>>> >> >> >>> > > > >> >>>>> -- >> >> >>> > > > >> >>>>> Jean-Baptiste Onofré >> >> >>> > > > >> >>>>> [email protected] <mailto: >> [email protected]> >> >> >>> > > > >> >>>>> http://blog.nanthrax.net >> >> >>> > > > >> >>>>> Talend - http://www.talend.com >> >> >>> > > > >> >>>>> >> >> >>> > > > >> >>> >> >> >>> > > > >> >> >> >> >>> > > > >> > >> >> >>> > > > >> > -- >> >> >>> > > > >> > Jean-Baptiste Onofré >> >> >>> > > > >> > [email protected] <mailto:[email protected]> >> >> >>> > > > >> > http://blog.nanthrax.net >> >> >>> > > > >> > Talend - http://www.talend.com >> >> >>> > > > >> >> >> >>> > > > >> >> >>> > > >> >> >>> > >> >> >>> >> >> >> >> >> > >> > -- >> > Jean-Baptiste Onofré >> > [email protected] >> > http://blog.nanthrax.net >> > Talend - http://www.talend.com >> >> > >
