Hmm, ESIO: https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L847 JDBCIO: https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L592 MongoIO: https://github.com/apache/beam/blob/master/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java#L657 etc...
They all use the same pattern. >From what you wrote - and technically I agree but in current state my point is valid I think - you should drop bundle from the whole user API and make it all @Internal, no? Romain Manni-Bucau @rmannibucau | Blog | Old Blog | Github | LinkedIn 2017-11-30 18:58 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net>: > Agree, but maybe we can inform the runner if wanted no ? > > Honestly, from my side, I'm fine with the current situation as it's runner > specific. > > Regards > JB > > On 11/30/2017 06:12 PM, Reuven Lax wrote: >> >> I don't think it belongs in PIpelineOptions, as bundle size is always a >> runner thing. >> >> We could consider adding a new generic RunnerOptions, however I'm not >> convinced all runners can actually support this. >> >> On Thu, Nov 30, 2017 at 6:09 AM, Romain Manni-Bucau <rmannibu...@gmail.com >> <mailto:rmannibu...@gmail.com>> wrote: >> >> Guys, >> >> what about moving getMaxBundleSize from flink options to pipeline >> options. I think all runners can support it right? Spark code needs >> the merge of the v2 before being able to be implemented probably but I >> don't see any blocker. >> >> wdyt? >> >> Romain Manni-Bucau >> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >> >> 2017-11-19 8:19 GMT+01:00 Romain Manni-Bucau <rmannibu...@gmail.com >> <mailto:rmannibu...@gmail.com>>: >> >> > @Eugene: "workaround" as specific to the IO each time and therefore >> > still highlight a lack in the core. >> > >> > Other comments inline >> > >> > >> > 2017-11-19 7:40 GMT+01:00 Robert Bradshaw >> <rober...@google.com.invalid>: >> >> There is a possible fourth issue that we don't handle well: >> efficiency. For >> >> very large bundles, it may be advantageous to avoid replaying a >> bunch of >> >> idempotent operations if there were a way to record what ones >> we're sure >> >> went through. Not sure if that's the issue here (though one could >> possibly >> >> do this with SDFs, one can preemptively returning periodically >> before an >> >> element (or portion thereof) is done). >> > >> > +1, also lead to the IO handling its own chunking/bundles and >> > therefore solves all issues at once. >> > >> >> >> >> On Sat, Nov 18, 2017 at 6:58 PM, Eugene Kirpichov < >> >> kirpic...@google.com.invalid> wrote: >> >> >> >>> I disagree that the usage of document id in ES is a "workaround" >> - it does >> >>> not address any *accidental *complexity >> >>> <https://en.wikipedia.org/wiki/No_Silver_Bullet >> <https://en.wikipedia.org/wiki/No_Silver_Bullet>> coming from >> shortcomings >> >>> of Beam, it addresses the *essential* complexity that a >> distributed system >> >>> forces one to take it as a fact of nature that the same write >> >>> (mutation) will happen multiple times, so if you want a mutation >> to happen >> >>> "as-if" it happened exactly once, the mutation itself must be >> idempotent >> >>> <https://en.wikipedia.org/wiki/Idempotence >> <https://en.wikipedia.org/wiki/Idempotence>>. Insert-with-id (upsert >> >>> <https://en.wikipedia.org/wiki/Merge_(SQL) >> <https://en.wikipedia.org/wiki/Merge_(SQL)>>) is a classic example of >> an >> >>> idempotent mutation, and it's very good that Elasticsearch >> provides it - if >> >>> it didn't, no matter how good of an API Beam had, achieving >> exactly-once >> >>> writes would be theoretically impossible. Are we in agreement on >> this so >> >>> far? >> >>> >> >>> Next: you seem to be discussing 3 issues together, all of which >> are valid >> >>> issues, but they seem unrelated to me: >> >>> 1. Exactly-once mutation >> >>> 2. Batching multiple mutations into one RPC. >> >>> 3. Backpressure >> >>> >> >>> #1: was considered above. The system the IO is talking to has to >> support >> >>> idempotent mutations, in an IO-specific way, and the IO has to >> take >> >>> advantage of them, in the IO-specific way - end of story. >> > >> > Agree but don't forget the original point was about "chunks" and >> not >> > individual records. >> > >> >>> >> >>> #2: a batch of idempotent operations is also idempotent, so this >> doesn't >> >>> add anything new semantically. Syntactically - Beam already >> allows you to >> >>> write your own batching by notifying you of permitted batch >> boundaries >> >>> (Start/FinishBundle). Sure, it could do more, but from my >> experience the >> >>> batching in IOs I've seen is one of the easiest and least >> error-prone >> >>> parts, so I don't see something worth an extended discussion >> here. >> > >> > "Beam already allows you to >> > write your own batching by notifying you of permitted batch >> boundaries >> > (Start/FinishBundle)" >> > >> > Is wrong since the bundle is potentially the whole PCollection >> (spark) >> > so this is not even an option until you use the SDF (back to the >> same >> > point). >> > Once again the API looks fine but no implementation makes it true. >> It >> > would be easy to change it in spark, flink can be ok since it >> targets >> > more the streaming case, not sure of others, any idea? >> > >> > >> >>> >> >>> #3: handling backpressure is a complex problem with multiple >> facets: 1) how >> >>> do you know you're being throttled, and by how much are you >> exceeding the >> >>> external system's capacity? >> > >> > This is the whole point of backpressure, the system sends it back >> to >> > you (header like or status technic in general) >> > >> >>> 2) how do you communicate this signal to the >> >>> runner? >> > >> > You are a client so you get the meta in the response - whatever >> techno. >> > >> >>> 3) what does the runner do in response? >> > >> > Runner nothing but the IO adapts its handling as mentionned before >> > (wait and retry, skip, ... depending the config) >> > >> >>> 4) how do you wait until >> >>> it's ok to try again? >> > >> > This is one point to probably enhance in beam but waiting in the >> > processing is an option if the source has some buffering otherwise >> it >> > requires to have a buffer fallback and max size if the wait mode is >> > activated. >> > >> >>> You seem to be advocating for solving one facet of this problem, >> which is: >> >>> you want it to be possible to signal to the runner "I'm being >> throttled, >> >>> please end the bundle", right? If so - I think this (ending the >> bundle) is >> >>> unnecessary: the DoFn can simply do an exponential back-off sleep >> loop. >> > >> > Agree, never said the runner should know but GBK+output doesnt work >> > cause you dont own the GBK. >> > >> >>> This is e.g. what DatastoreIO does >> >>> <https://github.com/apache/beam/blob/master/sdks/java/io/ >> <https://github.com/apache/beam/blob/master/sdks/java/io/> >> >>> google-cloud-platform/src/main/java/org/apache/beam/sdk/ >> >>> io/gcp/datastore/DatastoreV1.java#L1318> >> >>> and >> >>> this is in general how most systems I've seen handle >> backpressure. Is there >> >>> something I'm missing? In particular, is there any compelling >> reason why >> >>> you think it'd be beneficial e.g. for DatastoreIO to commit the >> results of >> >>> the bundle so far before processing other elements? >> > >> > It was more about ensuring you validate early a subset of the whole >> > bundle and avoid to reprocess it if it fails later. >> > >> > >> > So to summarize I see 2 outcomes: >> > >> > 1. impl SDF in all runners >> > 2. make the bundle size upper bounded - through a pipeline option - >> in >> > all runners, not sure this one is doable everywhere since I mainly >> > checked spark case >> > >> >>> >> >>> Again, it might be that I'm still misunderstanding what you're >> trying to >> >>> say. One of the things it would help to clarify would be - >> exactly what do >> >>> you mean by "how batch frameworks solved that for years": can you >> point at >> >>> an existing API in some other framework that achieves what you >> want? >> >>> >> >>> On Sat, Nov 18, 2017 at 1:02 PM Romain Manni-Bucau >> <rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>> >> >> >>> wrote: >> >>> >> >>> > Eugene, point - and issue with a single sample - is you can >> always find >> >>> > *workarounds* on a case by case basis as the id one with ES but >> beam >> >>> doesnt >> >>> > solve the problem as a framework. >> >>> > >> >>> > From my past, I clearly dont see how batch frameworks solved >> that for >> >>> years >> >>> > and beam is not able to do it - keep in mind it is the same >> kind of >> >>> techno, >> >>> > it just uses different sources and bigger clusters so no real >> reason to >> >>> not >> >>> > have the same feature quality. The only potential reason i see >> is there >> >>> is >> >>> > no tracking of the state into the cluster - e2e. But i dont see >> why there >> >>> > wouldnt be. Do I miss something here? >> >>> > >> >>> > An example could be: take a github crawler computing stats on >> the whole >> >>> > girhub repos which is based on a rest client as example. You >> will need to >> >>> > handle the rate limit and likely want to "commit" each time you >> reach a >> >>> > rate limit with likely some buffering strategy with a max size >> before >> >>> > really waiting. How do you do it with a GBK independent of your >> dofn? You >> >>> > are not able to compose correctly the fn between them :(. >> >>> > >> >>> > >> >>> > Le 18 nov. 2017 20:48, "Eugene Kirpichov" >> <kirpic...@google.com.invalid> >> >>> a >> >>> > écrit : >> >>> > >> >>> > After giving this thread my best attempt at understanding >> exactly what is >> >>> > the problem and the proposed solution, I'm afraid I still fail >> to >> >>> > understand both. To reiterate, I think the only way to make >> progress here >> >>> > is to be more concrete: (quote) take some IO that you think >> could be >> >>> easier >> >>> > to write with your proposed API, give the contents of a >> hypothetical >> >>> > PCollection being written to this IO, give the code of a >> hypothetical >> >>> DoFn >> >>> > implementing the write using your API, and explain what you'd >> expect to >> >>> > happen at runtime. I'm going to re-engage in this thread after >> such an >> >>> > example is given. >> >>> > >> >>> > On Sat, Nov 18, 2017, 5:00 AM Romain Manni-Bucau >> <rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>> >> >> >>> > wrote: >> >>> > >> >>> > > First bundle retry is unusable with dome runners like spark >> where the >> >>> > > bundle size is the collection size / number of work. This >> means a user >> >>> > cant >> >>> > > use bundle API or feature reliably and portably - which is >> beam >> >>> promise. >> >>> > > Aligning chunking and bundles would guarantee that bit can be >> not >> >>> > desired, >> >>> > > that is why i thought it can be another feature. >> >>> > > >> >>> > > GBK works until the IO knows about that and both concepts are >> not >> >>> always >> >>> > > orthogonal - backpressure like systems is a trivial common >> example. >> >>> This >> >>> > > means the IO (dofn) must be able to do it itself at some >> point. >> >>> > > >> >>> > > Also note the GBK works only if the IO can take a list which >> is never >> >>> the >> >>> > > case today. >> >>> > > >> >>> > > Big questions for me are: is SDF the way to go since it >> provides the >> >>> > needed >> >>> > > API bit is not yet supported? What about existing IO? Should >> beam >> >>> provide >> >>> > > an auto wrapping of dofn for that pre-aggregated support and >> simulate >> >>> > > bundles to the actual IO impl to keep the existing API? >> >>> > > >> >>> > > >> >>> > > Le 17 nov. 2017 19:20, "Raghu Angadi" >> <rang...@google.com.invalid> a >> >>> > > écrit : >> >>> > > >> >>> > > On Fri, Nov 17, 2017 at 1:02 AM, Romain Manni-Bucau < >> >>> > rmannibu...@gmail.com <mailto:rmannibu...@gmail.com> >> >> >>> > > > >> >>> > > wrote: >> >>> > > >> >>> > > > Yep, just take ES IO, if a part of a bundle fails you are >> in an >> >>> > > > unmanaged state. This is the case for all O (of IO ;)). >> Issue is not >> >>> > > > much about "1" (the code it takes) but more the fact it >> doesn't >> >>> > > > integrate with runner features and retries potentially: >> what happens >> >>> > > > if a bundle has a failure? => undefined today. 2. I'm fine >> with it >> >>> > > > while we know exactly what happens when we restart after a >> bundle >> >>> > > > failure. With ES the timestamp can be used for instance. >> >>> > > > >> >>> > > >> >>> > > This deterministic batching can be achieved even now with an >> extra >> >>> > > GroupByKey (and if you want ordering on top of that, will >> need another >> >>> > > GBK). Don't know if that is too costly in your case. I would >> need bit >> >>> > more >> >>> > > details on handling ES IO write retries to see it could be >> simplified. >> >>> > Note >> >>> > > that retries occur with or without any failures in your DoFn. >> >>> > > >> >>> > > The biggest negative with GBK approach is that it doesn't >> provide same >> >>> > > guarantees on Flink. >> >>> > > >> >>> > > I don't see how GroubIntoBatches in Beam provides specific >> guarantees >> >>> on >> >>> > > deterministic batches. >> >>> > > >> >>> > > Thinking about it the SDF is really a way to do it since the >> SDF will >> >>> > > > manage the bulking and associated with the runner "retry" >> it seems it >> >>> > > > covers the needs. >> >>> > > > >> >>> > > > Romain Manni-Bucau >> >>> > > > @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >>> > > > >> >>> > > > >> >>> > > > 2017-11-17 9:23 GMT+01:00 Eugene Kirpichov >> >>> > <kirpic...@google.com.invalid >> >>> > > >: >> >>> > > > > I must admit I'm still failing to understand the problem, >> so let's >> >>> > step >> >>> > > > > back even further. >> >>> > > > > >> >>> > > > > Could you give an example of an IO that is currently >> difficult to >> >>> > > > implement >> >>> > > > > specifically because of lack of the feature you're >> talking about? >> >>> > > > > >> >>> > > > > I'm asking because I've reviewed almost all Beam IOs and >> don't >> >>> recall >> >>> > > > > seeing a similar problem. Sure, a lot of IOs do batching >> within a >> >>> > > bundle, >> >>> > > > > but 1) it doesn't take up much code (granted, it would be >> even >> >>> easier >> >>> > > if >> >>> > > > > Beam did it for us) and 2) I don't remember any of them >> requiring >> >>> the >> >>> > > > > batches to be deterministic, and I'm having a hard time >> imagining >> >>> > what >> >>> > > > kind >> >>> > > > > of storage system would be able to deduplicate if batches >> were >> >>> > > > > deterministic but wouldn't be able to deduplicate if they >> weren't. >> >>> > > > > >> >>> > > > > On Thu, Nov 16, 2017 at 11:50 PM Romain Manni-Bucau < >> >>> > > > rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>> >> >> >>> > > > > wrote: >> >>> > > > > >> >>> > > > >> Ok, let me try to step back and summarize what we have >> today and >> >>> > what >> >>> > > I >> >>> > > > >> miss: >> >>> > > > >> >> >>> > > > >> 1. we can handle chunking in beam through group in batch >> (or >> >>> > > equivalent) >> >>> > > > >> but: >> >>> > > > >> > it is not built-in into the transforms (IO) and it >> is >> >>> > controlled >> >>> > > > >> from outside the transforms so no way for a transform to >> do it >> >>> > > > >> properly without handling itself a composition and links >> between >> >>> > > > >> multiple dofns to have notifications and potentially >> react >> >>> properly >> >>> > or >> >>> > > > >> handle backpressure from its backend >> >>> > > > >> 2. there is no restart feature because there is no real >> state >> >>> > handling >> >>> > > > >> at the moment. this sounds fully delegated to the runner >> but I was >> >>> > > > >> hoping to have more guarantees from the used API to be >> able to >> >>> > restart >> >>> > > > >> a pipeline (mainly batch since it can be irrelevant or >> delegates >> >>> to >> >>> > > > >> the backend for streams) and handle only not commited >> records so >> >>> it >> >>> > > > >> requires some persistence outside the main IO storages >> to do it >> >>> > > > >> properly >> >>> > > > >> > note this is somehow similar to the monitoring >> topic which >> >>> miss >> >>> > > > >> persistence ATM so it can end up to beam to have a >> pluggable >> >>> storage >> >>> > > > >> for a few concerns >> >>> > > > >> >> >>> > > > >> >> >>> > > > >> Short term I would be happy with 1 solved properly, long >> term I >> >>> hope >> >>> > 2 >> >>> > > > >> will be tackled without workarounds requiring custom >> wrapping of >> >>> IO >> >>> > to >> >>> > > > >> use a custom state persistence. >> >>> > > > >> >> >>> > > > >> >> >>> > > > >> >> >>> > > > >> Romain Manni-Bucau >> >>> > > > >> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >>> > > > >> >> >>> > > > >> >> >>> > > > >> 2017-11-17 7:44 GMT+01:00 Jean-Baptiste Onofré >> <j...@nanthrax.net <mailto:j...@nanthrax.net>>: >> >> >>> > > > >> > Thanks for the explanation. Agree, we might talk about >> different >> >>> > > > things >> >>> > > > >> > using the same wording. >> >>> > > > >> > >> >>> > > > >> > I'm also struggling to understand the use case (for a >> generic >> >>> > DoFn). >> >>> > > > >> > >> >>> > > > >> > Regards >> >>> > > > >> > JB >> >>> > > > >> > >> >>> > > > >> > >> >>> > > > >> > On 11/17/2017 07:40 AM, Eugene Kirpichov wrote: >> >>> > > > >> >> >> >>> > > > >> >> To avoid spending a lot of time pursuing a false >> path, I'd like >> >>> > to >> >>> > > > say >> >>> > > > >> >> straight up that SDF is definitely not going to help >> here, >> >>> > despite >> >>> > > > the >> >>> > > > >> >> fact >> >>> > > > >> >> that its API includes the term "checkpoint". In SDF, >> the >> >>> > > "checkpoint" >> >>> > > > >> >> captures the state of processing within a single >> element. If >> >>> > you're >> >>> > > > >> >> applying an SDF to 1000 elements, it will, like any >> other DoFn, >> >>> > be >> >>> > > > >> applied >> >>> > > > >> >> to each of them independently and in parallel, and >> you'll have >> >>> > 1000 >> >>> > > > >> >> checkpoints capturing the state of processing each of >> these >> >>> > > elements, >> >>> > > > >> >> which >> >>> > > > >> >> is probably not what you want. >> >>> > > > >> >> >> >>> > > > >> >> I'm afraid I still don't understand what kind of >> checkpoint you >> >>> > > > need, if >> >>> > > > >> >> it >> >>> > > > >> >> is not just deterministic grouping into batches. >> "Checkpoint" >> >>> is >> >>> > a >> >>> > > > very >> >>> > > > >> >> broad term and it's very possible that everybody in >> this thread >> >>> > is >> >>> > > > >> talking >> >>> > > > >> >> about different things when saying it. So it would >> help if you >> >>> > > could >> >>> > > > >> give >> >>> > > > >> >> a >> >>> > > > >> >> more concrete example: for example, take some IO that >> you think >> >>> > > > could be >> >>> > > > >> >> easier to write with your proposed API, give the >> contents of a >> >>> > > > >> >> hypothetical >> >>> > > > >> >> PCollection being written to this IO, give the code >> of a >> >>> > > hypothetical >> >>> > > > >> DoFn >> >>> > > > >> >> implementing the write using your API, and explain >> what you'd >> >>> > > expect >> >>> > > > to >> >>> > > > >> >> happen at runtime. >> >>> > > > >> >> >> >>> > > > >> >> On Thu, Nov 16, 2017 at 10:33 PM Romain Manni-Bucau >> >>> > > > >> >> <rmannibu...@gmail.com >> <mailto:rmannibu...@gmail.com>> >> >> >>> > > > >> >> wrote: >> >>> > > > >> >> >> >>> > > > >> >>> @Eugene: yes and the other alternative of Reuven too >> but it is >> >>> > > still >> >>> > > > >> >>> 1. relying on timers, 2. not really checkpointed >> >>> > > > >> >>> >> >>> > > > >> >>> In other words it seems all solutions are to create >> a chunk of >> >>> > > size >> >>> > > > 1 >> >>> > > > >> >>> and replayable to fake the lack of chunking in the >> framework. >> >>> > This >> >>> > > > >> >>> always implies a chunk handling outside the >> component >> >>> (typically >> >>> > > > >> >>> before for an output). My point is I think IO need >> it in their >> >>> > own >> >>> > > > >> >>> "internal" or at least control it themselves since >> the chunk >> >>> > size >> >>> > > is >> >>> > > > >> >>> part of the IO handling most of the time. >> >>> > > > >> >>> >> >>> > > > >> >>> I think JB spoke of the same "group before" trick >> using >> >>> > > restrictions >> >>> > > > >> >>> which can work I have to admit if SDF are >> implemented by >> >>> > runners. >> >>> > > Is >> >>> > > > >> >>> there a roadmap/status on that? Last time I checked >> SDF was a >> >>> > > great >> >>> > > > >> >>> API without support :(. >> >>> > > > >> >>> >> >>> > > > >> >>> >> >>> > > > >> >>> >> >>> > > > >> >>> Romain Manni-Bucau >> >>> > > > >> >>> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >>> > > > >> >>> >> >>> > > > >> >>> >> >>> > > > >> >>> 2017-11-17 7:25 GMT+01:00 Eugene Kirpichov >> >>> > > > >> >>> <kirpic...@google.com.invalid>: >> >>> > > > >> >>>> >> >>> > > > >> >>>> JB, not sure what you mean? SDFs and triggers are >> unrelated, >> >>> > and >> >>> > > > the >> >>> > > > >> >>>> post >> >>> > > > >> >>>> doesn't mention the word. Did you mean something >> else, e.g. >> >>> > > > >> restriction >> >>> > > > >> >>>> perhaps? Either way I don't think SDFs are the >> solution here; >> >>> > > SDFs >> >>> > > > >> have >> >>> > > > >> >>> >> >>> > > > >> >>> to >> >>> > > > >> >>>> >> >>> > > > >> >>>> do with the ability to split the processing of *a >> single >> >>> > element* >> >>> > > > over >> >>> > > > >> >>>> multiple calls, whereas Romain I think is asking >> for >> >>> repeatable >> >>> > > > >> grouping >> >>> > > > >> >>> >> >>> > > > >> >>> of >> >>> > > > >> >>>> >> >>> > > > >> >>>> *multiple* elements. >> >>> > > > >> >>>> >> >>> > > > >> >>>> Romain - does >> >>> > > > >> >>>> >> >>> > > > >> >>> >> >>> > > > >> >>> >> >>> > > > >> https://github.com/apache/beam/blob/master/sdks/java/ >> <https://github.com/apache/beam/blob/master/sdks/java/> >> >>> > > > core/src/main/java/org/apache/beam/sdk/transforms/ >> >>> GroupIntoBatches.java >> >>> > > > >> >>>> >> >>> > > > >> >>>> do what >> >>> > > > >> >>>> you want? >> >>> > > > >> >>>> >> >>> > > > >> >>>> On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste >> Onofré < >> >>> > > > >> j...@nanthrax.net <mailto:j...@nanthrax.net>> >> >>> > > > >> >>>> wrote: >> >>> > > > >> >>>> >> >>> > > > >> >>>>> It sounds like the "Trigger" in the Splittable >> DoFn, no ? >> >>> > > > >> >>>>> >> >>> > > > >> >>>>> >> https://beam.apache.org/blog/2017/08/16/splittable-do-fn >> <https://beam.apache.org/blog/2017/08/16/splittable-do-fn>. >> >> >>> html >> >>> > > > >> >>>>> >> >>> > > > >> >>>>> Regards >> >>> > > > >> >>>>> JB >> >>> > > > >> >>>>> >> >>> > > > >> >>>>> >> >>> > > > >> >>>>> On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote: >> >>> > > > >> >>>>>> >> >>> > > > >> >>>>>> it gives the fn/transform the ability to save a >> state - it >> >>> > can >> >>> > > > get >> >>> > > > >> >>>>>> back on "restart" / whatever unit we can use, >> probably >> >>> runner >> >>> > > > >> >>>>>> dependent? Without that you need to rewrite all >> IO usage >> >>> with >> >>> > > > >> >>>>>> something like the previous pattern which makes >> the IO not >> >>> > self >> >>> > > > >> >>>>>> sufficient and kind of makes the entry cost and >> usage of >> >>> beam >> >>> > > way >> >>> > > > >> >>>>>> further. >> >>> > > > >> >>>>>> >> >>> > > > >> >>>>>> In my mind it is exactly what jbatch/spring-batch >> uses but >> >>> > > > adapted >> >>> > > > >> to >> >>> > > > >> >>>>>> beam (stream in particular) case. >> >>> > > > >> >>>>>> >> >>> > > > >> >>>>>> Romain Manni-Bucau >> >>> > > > >> >>>>>> @rmannibucau | Blog | Old Blog | Github | >> LinkedIn >> >>> > > > >> >>>>>> >> >>> > > > >> >>>>>> >> >>> > > > >> >>>>>> 2017-11-17 6:49 GMT+01:00 Reuven Lax >> >>> > <re...@google.com.invalid >> >>> > > >: >> >>> > > > >> >>>>>>> >> >>> > > > >> >>>>>>> Romain, >> >>> > > > >> >>>>>>> >> >>> > > > >> >>>>>>> Can you define what you mean by checkpoint? What >> are the >> >>> > > > semantics, >> >>> > > > >> >>> >> >>> > > > >> >>> what >> >>> > > > >> >>>>>>> >> >>> > > > >> >>>>>>> does it accomplish? >> >>> > > > >> >>>>>>> >> >>> > > > >> >>>>>>> Reuven >> >>> > > > >> >>>>>>> >> >>> > > > >> >>>>>>> On Fri, Nov 17, 2017 at 1:40 PM, Romain >> Manni-Bucau < >> >>> > > > >> >>>>> >> >>> > > > >> >>>>> rmannibu...@gmail.com >> <mailto:rmannibu...@gmail.com>> >> >> >>> > > > >> >>>>>>> >> >>> > > > >> >>>>>>> wrote: >> >>> > > > >> >>>>>>> >> >>> > > > >> >>>>>>>> Yes, what I propose earlier was: >> >>> > > > >> >>>>>>>> >> >>> > > > >> >>>>>>>> I. checkpoint marker: >> >>> > > > >> >>>>>>>> >> >>> > > > >> >>>>>>>> @AnyBeamAnnotation >> >>> > > > >> >>>>>>>> @CheckpointAfter >> >>> > > > >> >>>>>>>> public void someHook(SomeContext ctx); >> >>> > > > >> >>>>>>>> >> >>> > > > >> >>>>>>>> >> >>> > > > >> >>>>>>>> II. pipeline.apply(ParDo.of(new >> >>> > > > >> MyFn()).withCheckpointAlgorithm(new >> >>> > > > >> >>>>>>>> CountingAlgo())) >> >>> > > > >> >>>>>>>> >> >>> > > > >> >>>>>>>> III. (I like this one less) >> >>> > > > >> >>>>>>>> >> >>> > > > >> >>>>>>>> // in the dofn >> >>> > > > >> >>>>>>>> @CheckpointTester >> >>> > > > >> >>>>>>>> public boolean shouldCheckpoint(); >> >>> > > > >> >>>>>>>> >> >>> > > > >> >>>>>>>> IV. @Checkpointer Serializable getCheckpoint(); >> in the >> >>> dofn >> >>> > > per >> >>> > > > >> >>> >> >>> > > > >> >>> element >> >>> > > > >> >>>>>>>> >> >>> > > > >> >>>>>>>> >> >>> > > > >> >>>>>>>> >> >>> > > > >> >>>>>>>> >> >>> > > > >> >>>>>>>> >> >>> > > > >> >>>>>>>> Romain Manni-Bucau >> >>> > > > >> >>>>>>>> @rmannibucau | Blog | Old Blog | Github | >> LinkedIn >> >>> > > > >> >>>>>>>> >> >>> > > > >> >>>>>>>> >> >>> > > > >> >>>>>>>> 2017-11-17 6:06 GMT+01:00 Raghu Angadi >> >>> > > > <rang...@google.com.invalid >> >>> > > > >> >>>> >> >>> > > > >> >>>> : >> >>> > > > >> >>>>>>>>> >> >>> > > > >> >>>>>>>>> How would you define it (rough API is fine)?. >> Without >> >>> more >> >>> > > > >> details, >> >>> > > > >> >>>>> >> >>> > > > >> >>>>> it is >> >>> > > > >> >>>>>>>>> >> >>> > > > >> >>>>>>>>> not easy to see wider applicability and >> feasibility in >> >>> > > > runners. >> >>> > > > >> >>>>>>>>> >> >>> > > > >> >>>>>>>>> On Thu, Nov 16, 2017 at 1:13 PM, Romain >> Manni-Bucau < >> >>> > > > >> >>>>>>>> >> >>> > > > >> >>>>>>>> rmannibu...@gmail.com >> <mailto:rmannibu...@gmail.com>> >> >> >>> > > > >> >>>>>>>>> >> >>> > > > >> >>>>>>>>> wrote: >> >>> > > > >> >>>>>>>>> >> >>> > > > >> >>>>>>>>>> This is a fair summary of the current state >> but also >> >>> > where >> >>> > > > beam >> >>> > > > >> >>> >> >>> > > > >> >>> can >> >>> > > > >> >>>>>>>> >> >>> > > > >> >>>>>>>> have a >> >>> > > > >> >>>>>>>>>> >> >>> > > > >> >>>>>>>>>> very strong added value and make big data >> great and >> >>> > smooth. >> >>> > > > >> >>>>>>>>>> >> >>> > > > >> >>>>>>>>>> Instead of this replay feature isnt >> checkpointing >> >>> > willable? >> >>> > > > In >> >>> > > > >> >>>>>>>> >> >>> > > > >> >>>>>>>> particular >> >>> > > > >> >>>>>>>>>> >> >>> > > > >> >>>>>>>>>> with SDF no? >> >>> > > > >> >>>>>>>>>> >> >>> > > > >> >>>>>>>>>> >> >>> > > > >> >>>>>>>>>> Le 16 nov. 2017 19:50, "Raghu Angadi" >> >>> > > > >> <rang...@google.com.invalid> >> >>> > > > >> >>> >> >>> > > > >> >>> a >> >>> > > > >> >>>>>>>>>> >> >>> > > > >> >>>>>>>>>> écrit : >> >>> > > > >> >>>>>>>>>> >> >>> > > > >> >>>>>>>>>>> Core issue here is that there is no explicit >> concept >> >>> of >> >>> > > > >> >>> >> >>> > > > >> >>> 'checkpoint' >> >>> > > > >> >>>>>>>> >> >>> > > > >> >>>>>>>> in >> >>> > > > >> >>>>>>>>>>> >> >>> > > > >> >>>>>>>>>>> Beam (UnboundedSource has a method >> 'getCheckpointMark' >> >>> > but >> >>> > > > that >> >>> > > > >> >>>>>>>> >> >>> > > > >> >>>>>>>> refers to >> >>> > > > >> >>>>>>>>>>> >> >>> > > > >> >>>>>>>>>>> the checkoint on external source). Runners >> do >> >>> checkpoint >> >>> > > > >> >>> >> >>> > > > >> >>> internally >> >>> > > > >> >>>>> >> >>> > > > >> >>>>> as >> >>> > > > >> >>>>>>>>>>> >> >>> > > > >> >>>>>>>>>>> implementation detail. Flink's checkpoint >> model is >> >>> > > entirely >> >>> > > > >> >>>>> >> >>> > > > >> >>>>> different >> >>> > > > >> >>>>>>>>>> >> >>> > > > >> >>>>>>>>>> from >> >>> > > > >> >>>>>>>>>>> >> >>> > > > >> >>>>>>>>>>> Dataflow's and Spark's. >> >>> > > > >> >>>>>>>>>>> >> >>> > > > >> >>>>>>>>>>> @StableReplay helps, but it does not >> explicitly talk >> >>> > about >> >>> > > a >> >>> > > > >> >>>>>>>> >> >>> > > > >> >>>>>>>> checkpoint >> >>> > > > >> >>>>>>>>>> >> >>> > > > >> >>>>>>>>>> by >> >>> > > > >> >>>>>>>>>>> >> >>> > > > >> >>>>>>>>>>> design. >> >>> > > > >> >>>>>>>>>>> >> >>> > > > >> >>>>>>>>>>> If you are looking to achieve some >> guarantees with a >> >>> > > > >> sink/DoFn, I >> >>> > > > >> >>>>>>>> >> >>> > > > >> >>>>>>>> think >> >>> > > > >> >>>>>>>>>> >> >>> > > > >> >>>>>>>>>> it >> >>> > > > >> >>>>>>>>>>> >> >>> > > > >> >>>>>>>>>>> is better to start with the requirements. I >> worked on >> >>> > > > >> >>> >> >>> > > > >> >>> exactly-once >> >>> > > > >> >>>>>>>> >> >>> > > > >> >>>>>>>> sink >> >>> > > > >> >>>>>>>>>> >> >>> > > > >> >>>>>>>>>> for >> >>> > > > >> >>>>>>>>>>> >> >>> > > > >> >>>>>>>>>>> Kafka (see KafkaIO.write().withEOS()), where >> we >> >>> > > essentially >> >>> > > > >> >>> >> >>> > > > >> >>> reshard >> >>> > > > >> >>>>>>>> >> >>> > > > >> >>>>>>>> the >> >>> > > > >> >>>>>>>>>>> >> >>> > > > >> >>>>>>>>>>> elements and assign sequence numbers to >> elements with >> >>> in >> >>> > > > each >> >>> > > > >> >>> >> >>> > > > >> >>> shard. >> >>> > > > >> >>>>>>>>>>> >> >>> > > > >> >>>>>>>>>>> Duplicates in replays are avoided based on >> these >> >>> > sequence >> >>> > > > >> >>> >> >>> > > > >> >>> numbers. >> >>> > > > >> >>>>>>>> >> >>> > > > >> >>>>>>>> DoFn >> >>> > > > >> >>>>>>>>>>> >> >>> > > > >> >>>>>>>>>>> state API is used to buffer out-of order >> replays. The >> >>> > > > >> >>> >> >>> > > > >> >>> implementation >> >>> > > > >> >>>>>>>>>>> >> >>> > > > >> >>>>>>>>>>> strategy works in Dataflow but not in Flink >> which has >> >>> a >> >>> > > > >> >>> >> >>> > > > >> >>> horizontal >> >>> > > > >> >>>>>>>>>>> >> >>> > > > >> >>>>>>>>>>> checkpoint. KafkaIO checks for >> compatibility. >> >>> > > > >> >>>>>>>>>>> >> >>> > > > >> >>>>>>>>>>> On Wed, Nov 15, 2017 at 12:38 AM, Romain >> Manni-Bucau < >> >>> > > > >> >>>>>>>>>>> rmannibu...@gmail.com >> <mailto:rmannibu...@gmail.com>> >> >> >>> > > > >> >>>>>>>>>>> wrote: >> >>> > > > >> >>>>>>>>>>> >> >>> > > > >> >>>>>>>>>>>> Hi guys, >> >>> > > > >> >>>>>>>>>>>> >> >>> > > > >> >>>>>>>>>>>> The subject is a bit provocative but the >> topic is >> >>> real >> >>> > > and >> >>> > > > >> >>> >> >>> > > > >> >>> coming >> >>> > > > >> >>>>>>>>>>>> >> >>> > > > >> >>>>>>>>>>>> again and again with the beam usage: how a >> dofn can >> >>> > > handle >> >>> > > > >> some >> >>> > > > >> >>>>>>>>>>>> "chunking". >> >>> > > > >> >>>>>>>>>>>> >> >>> > > > >> >>>>>>>>>>>> The need is to be able to commit each N >> records but >> >>> > with >> >>> > > N >> >>> > > > not >> >>> > > > >> >>> >> >>> > > > >> >>> too >> >>> > > > >> >>>>>>>> >> >>> > > > >> >>>>>>>> big. >> >>> > > > >> >>>>>>>>>>>> >> >>> > > > >> >>>>>>>>>>>> >> >>> > > > >> >>>>>>>>>>>> The natural API for that in beam is the >> bundle one >> >>> but >> >>> > > > bundles >> >>> > > > >> >>> >> >>> > > > >> >>> are >> >>> > > > >> >>>>>>>> >> >>> > > > >> >>>>>>>> not >> >>> > > > >> >>>>>>>>>>>> >> >>> > > > >> >>>>>>>>>>>> reliable since they can be very small >> (flink) - we >> >>> can >> >>> > > say >> >>> > > > it >> >>> > > > >> is >> >>> > > > >> >>>>>>>> >> >>> > > > >> >>>>>>>> "ok" >> >>> > > > >> >>>>>>>>>>>> >> >>> > > > >> >>>>>>>>>>>> even if it has some perf impacts - or too >> big (spark >> >>> > does >> >>> > > > full >> >>> > > > >> >>> >> >>> > > > >> >>> size >> >>> > > > >> >>>>>>>> >> >>> > > > >> >>>>>>>> / >> >>> > > > >> >>>>>>>>>>>> >> >>> > > > >> >>>>>>>>>>>> #workers). >> >>> > > > >> >>>>>>>>>>>> >> >>> > > > >> >>>>>>>>>>>> The workaround is what we see in the ES >> I/O: a >> >>> maxSize >> >>> > > > which >> >>> > > > >> >>> >> >>> > > > >> >>> does >> >>> > > > >> >>>>> >> >>> > > > >> >>>>> an >> >>> > > > >> >>>>>>>>>>>> >> >>> > > > >> >>>>>>>>>>>> eager flush. The issue is that then the >> checkpoint is >> >>> > not >> >>> > > > >> >>> >> >>> > > > >> >>> respected >> >>> > > > >> >>>>>>>>>>>> >> >>> > > > >> >>>>>>>>>>>> and you can process multiple times the same >> records. >> >>> > > > >> >>>>>>>>>>>> >> >>> > > > >> >>>>>>>>>>>> Any plan to make this API reliable and >> controllable >> >>> > from >> >>> > > a >> >>> > > > >> beam >> >>> > > > >> >>>>>>>> >> >>> > > > >> >>>>>>>> point >> >>> > > > >> >>>>>>>>>>>> >> >>> > > > >> >>>>>>>>>>>> of view (at least in a max manner)? >> >>> > > > >> >>>>>>>>>>>> >> >>> > > > >> >>>>>>>>>>>> Thanks, >> >>> > > > >> >>>>>>>>>>>> Romain Manni-Bucau >> >>> > > > >> >>>>>>>>>>>> @rmannibucau | Blog | Old Blog | Github | >> LinkedIn >> >>> > > > >> >>>>>>>>>>>> >> >>> > > > >> >>>>>>>>>>> >> >>> > > > >> >>>>>>>>>> >> >>> > > > >> >>>>>>>> >> >>> > > > >> >>>>> >> >>> > > > >> >>>>> -- >> >>> > > > >> >>>>> Jean-Baptiste Onofré >> >>> > > > >> >>>>> jbono...@apache.org <mailto:jbono...@apache.org> >> >>> > > > >> >>>>> http://blog.nanthrax.net >> >>> > > > >> >>>>> Talend - http://www.talend.com >> >>> > > > >> >>>>> >> >>> > > > >> >>> >> >>> > > > >> >> >> >>> > > > >> > >> >>> > > > >> > -- >> >>> > > > >> > Jean-Baptiste Onofré >> >>> > > > >> > jbono...@apache.org <mailto:jbono...@apache.org> >> >>> > > > >> > http://blog.nanthrax.net >> >>> > > > >> > Talend - http://www.talend.com >> >>> > > > >> >> >>> > > > >> >>> > > >> >>> > >> >>> >> >> > > -- > Jean-Baptiste Onofré > jbono...@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com