I don't think it belongs in PIpelineOptions, as bundle size is always a runner thing.
We could consider adding a new generic RunnerOptions, however I'm not convinced all runners can actually support this. On Thu, Nov 30, 2017 at 6:09 AM, Romain Manni-Bucau <rmannibu...@gmail.com> wrote: > Guys, > > what about moving getMaxBundleSize from flink options to pipeline > options. I think all runners can support it right? Spark code needs > the merge of the v2 before being able to be implemented probably but I > don't see any blocker. > > wdyt? > > Romain Manni-Bucau > @rmannibucau | Blog | Old Blog | Github | LinkedIn > > > 2017-11-19 8:19 GMT+01:00 Romain Manni-Bucau <rmannibu...@gmail.com>: > > @Eugene: "workaround" as specific to the IO each time and therefore > > still highlight a lack in the core. > > > > Other comments inline > > > > > > 2017-11-19 7:40 GMT+01:00 Robert Bradshaw <rober...@google.com.invalid>: > >> There is a possible fourth issue that we don't handle well: efficiency. > For > >> very large bundles, it may be advantageous to avoid replaying a bunch of > >> idempotent operations if there were a way to record what ones we're sure > >> went through. Not sure if that's the issue here (though one could > possibly > >> do this with SDFs, one can preemptively returning periodically before an > >> element (or portion thereof) is done). > > > > +1, also lead to the IO handling its own chunking/bundles and > > therefore solves all issues at once. > > > >> > >> On Sat, Nov 18, 2017 at 6:58 PM, Eugene Kirpichov < > >> kirpic...@google.com.invalid> wrote: > >> > >>> I disagree that the usage of document id in ES is a "workaround" - it > does > >>> not address any *accidental *complexity > >>> <https://en.wikipedia.org/wiki/No_Silver_Bullet> coming from > shortcomings > >>> of Beam, it addresses the *essential* complexity that a distributed > system > >>> forces one to take it as a fact of nature that the same write > >>> (mutation) will happen multiple times, so if you want a mutation to > happen > >>> "as-if" it happened exactly once, the mutation itself must be > idempotent > >>> <https://en.wikipedia.org/wiki/Idempotence>. Insert-with-id (upsert > >>> <https://en.wikipedia.org/wiki/Merge_(SQL)>) is a classic example of > an > >>> idempotent mutation, and it's very good that Elasticsearch provides it > - if > >>> it didn't, no matter how good of an API Beam had, achieving > exactly-once > >>> writes would be theoretically impossible. Are we in agreement on this > so > >>> far? > >>> > >>> Next: you seem to be discussing 3 issues together, all of which are > valid > >>> issues, but they seem unrelated to me: > >>> 1. Exactly-once mutation > >>> 2. Batching multiple mutations into one RPC. > >>> 3. Backpressure > >>> > >>> #1: was considered above. The system the IO is talking to has to > support > >>> idempotent mutations, in an IO-specific way, and the IO has to take > >>> advantage of them, in the IO-specific way - end of story. > > > > Agree but don't forget the original point was about "chunks" and not > > individual records. > > > >>> > >>> #2: a batch of idempotent operations is also idempotent, so this > doesn't > >>> add anything new semantically. Syntactically - Beam already allows you > to > >>> write your own batching by notifying you of permitted batch boundaries > >>> (Start/FinishBundle). Sure, it could do more, but from my experience > the > >>> batching in IOs I've seen is one of the easiest and least error-prone > >>> parts, so I don't see something worth an extended discussion here. > > > > "Beam already allows you to > > write your own batching by notifying you of permitted batch boundaries > > (Start/FinishBundle)" > > > > Is wrong since the bundle is potentially the whole PCollection (spark) > > so this is not even an option until you use the SDF (back to the same > > point). > > Once again the API looks fine but no implementation makes it true. It > > would be easy to change it in spark, flink can be ok since it targets > > more the streaming case, not sure of others, any idea? > > > > > >>> > >>> #3: handling backpressure is a complex problem with multiple facets: > 1) how > >>> do you know you're being throttled, and by how much are you exceeding > the > >>> external system's capacity? > > > > This is the whole point of backpressure, the system sends it back to > > you (header like or status technic in general) > > > >>> 2) how do you communicate this signal to the > >>> runner? > > > > You are a client so you get the meta in the response - whatever techno. > > > >>> 3) what does the runner do in response? > > > > Runner nothing but the IO adapts its handling as mentionned before > > (wait and retry, skip, ... depending the config) > > > >>> 4) how do you wait until > >>> it's ok to try again? > > > > This is one point to probably enhance in beam but waiting in the > > processing is an option if the source has some buffering otherwise it > > requires to have a buffer fallback and max size if the wait mode is > > activated. > > > >>> You seem to be advocating for solving one facet of this problem, which > is: > >>> you want it to be possible to signal to the runner "I'm being > throttled, > >>> please end the bundle", right? If so - I think this (ending the > bundle) is > >>> unnecessary: the DoFn can simply do an exponential back-off sleep loop. > > > > Agree, never said the runner should know but GBK+output doesnt work > > cause you dont own the GBK. > > > >>> This is e.g. what DatastoreIO does > >>> <https://github.com/apache/beam/blob/master/sdks/java/io/ > >>> google-cloud-platform/src/main/java/org/apache/beam/sdk/ > >>> io/gcp/datastore/DatastoreV1.java#L1318> > >>> and > >>> this is in general how most systems I've seen handle backpressure. Is > there > >>> something I'm missing? In particular, is there any compelling reason > why > >>> you think it'd be beneficial e.g. for DatastoreIO to commit the > results of > >>> the bundle so far before processing other elements? > > > > It was more about ensuring you validate early a subset of the whole > > bundle and avoid to reprocess it if it fails later. > > > > > > So to summarize I see 2 outcomes: > > > > 1. impl SDF in all runners > > 2. make the bundle size upper bounded - through a pipeline option - in > > all runners, not sure this one is doable everywhere since I mainly > > checked spark case > > > >>> > >>> Again, it might be that I'm still misunderstanding what you're trying > to > >>> say. One of the things it would help to clarify would be - exactly > what do > >>> you mean by "how batch frameworks solved that for years": can you > point at > >>> an existing API in some other framework that achieves what you want? > >>> > >>> On Sat, Nov 18, 2017 at 1:02 PM Romain Manni-Bucau < > rmannibu...@gmail.com> > >>> wrote: > >>> > >>> > Eugene, point - and issue with a single sample - is you can always > find > >>> > *workarounds* on a case by case basis as the id one with ES but beam > >>> doesnt > >>> > solve the problem as a framework. > >>> > > >>> > From my past, I clearly dont see how batch frameworks solved that for > >>> years > >>> > and beam is not able to do it - keep in mind it is the same kind of > >>> techno, > >>> > it just uses different sources and bigger clusters so no real reason > to > >>> not > >>> > have the same feature quality. The only potential reason i see is > there > >>> is > >>> > no tracking of the state into the cluster - e2e. But i dont see why > there > >>> > wouldnt be. Do I miss something here? > >>> > > >>> > An example could be: take a github crawler computing stats on the > whole > >>> > girhub repos which is based on a rest client as example. You will > need to > >>> > handle the rate limit and likely want to "commit" each time you > reach a > >>> > rate limit with likely some buffering strategy with a max size before > >>> > really waiting. How do you do it with a GBK independent of your > dofn? You > >>> > are not able to compose correctly the fn between them :(. > >>> > > >>> > > >>> > Le 18 nov. 2017 20:48, "Eugene Kirpichov" > <kirpic...@google.com.invalid> > >>> a > >>> > écrit : > >>> > > >>> > After giving this thread my best attempt at understanding exactly > what is > >>> > the problem and the proposed solution, I'm afraid I still fail to > >>> > understand both. To reiterate, I think the only way to make progress > here > >>> > is to be more concrete: (quote) take some IO that you think could be > >>> easier > >>> > to write with your proposed API, give the contents of a hypothetical > >>> > PCollection being written to this IO, give the code of a hypothetical > >>> DoFn > >>> > implementing the write using your API, and explain what you'd expect > to > >>> > happen at runtime. I'm going to re-engage in this thread after such > an > >>> > example is given. > >>> > > >>> > On Sat, Nov 18, 2017, 5:00 AM Romain Manni-Bucau < > rmannibu...@gmail.com> > >>> > wrote: > >>> > > >>> > > First bundle retry is unusable with dome runners like spark where > the > >>> > > bundle size is the collection size / number of work. This means a > user > >>> > cant > >>> > > use bundle API or feature reliably and portably - which is beam > >>> promise. > >>> > > Aligning chunking and bundles would guarantee that bit can be not > >>> > desired, > >>> > > that is why i thought it can be another feature. > >>> > > > >>> > > GBK works until the IO knows about that and both concepts are not > >>> always > >>> > > orthogonal - backpressure like systems is a trivial common example. > >>> This > >>> > > means the IO (dofn) must be able to do it itself at some point. > >>> > > > >>> > > Also note the GBK works only if the IO can take a list which is > never > >>> the > >>> > > case today. > >>> > > > >>> > > Big questions for me are: is SDF the way to go since it provides > the > >>> > needed > >>> > > API bit is not yet supported? What about existing IO? Should beam > >>> provide > >>> > > an auto wrapping of dofn for that pre-aggregated support and > simulate > >>> > > bundles to the actual IO impl to keep the existing API? > >>> > > > >>> > > > >>> > > Le 17 nov. 2017 19:20, "Raghu Angadi" <rang...@google.com.invalid> > a > >>> > > écrit : > >>> > > > >>> > > On Fri, Nov 17, 2017 at 1:02 AM, Romain Manni-Bucau < > >>> > rmannibu...@gmail.com > >>> > > > > >>> > > wrote: > >>> > > > >>> > > > Yep, just take ES IO, if a part of a bundle fails you are in an > >>> > > > unmanaged state. This is the case for all O (of IO ;)). Issue is > not > >>> > > > much about "1" (the code it takes) but more the fact it doesn't > >>> > > > integrate with runner features and retries potentially: what > happens > >>> > > > if a bundle has a failure? => undefined today. 2. I'm fine with > it > >>> > > > while we know exactly what happens when we restart after a bundle > >>> > > > failure. With ES the timestamp can be used for instance. > >>> > > > > >>> > > > >>> > > This deterministic batching can be achieved even now with an extra > >>> > > GroupByKey (and if you want ordering on top of that, will need > another > >>> > > GBK). Don't know if that is too costly in your case. I would need > bit > >>> > more > >>> > > details on handling ES IO write retries to see it could be > simplified. > >>> > Note > >>> > > that retries occur with or without any failures in your DoFn. > >>> > > > >>> > > The biggest negative with GBK approach is that it doesn't provide > same > >>> > > guarantees on Flink. > >>> > > > >>> > > I don't see how GroubIntoBatches in Beam provides specific > guarantees > >>> on > >>> > > deterministic batches. > >>> > > > >>> > > Thinking about it the SDF is really a way to do it since the SDF > will > >>> > > > manage the bulking and associated with the runner "retry" it > seems it > >>> > > > covers the needs. > >>> > > > > >>> > > > Romain Manni-Bucau > >>> > > > @rmannibucau | Blog | Old Blog | Github | LinkedIn > >>> > > > > >>> > > > > >>> > > > 2017-11-17 9:23 GMT+01:00 Eugene Kirpichov > >>> > <kirpic...@google.com.invalid > >>> > > >: > >>> > > > > I must admit I'm still failing to understand the problem, so > let's > >>> > step > >>> > > > > back even further. > >>> > > > > > >>> > > > > Could you give an example of an IO that is currently difficult > to > >>> > > > implement > >>> > > > > specifically because of lack of the feature you're talking > about? > >>> > > > > > >>> > > > > I'm asking because I've reviewed almost all Beam IOs and don't > >>> recall > >>> > > > > seeing a similar problem. Sure, a lot of IOs do batching > within a > >>> > > bundle, > >>> > > > > but 1) it doesn't take up much code (granted, it would be even > >>> easier > >>> > > if > >>> > > > > Beam did it for us) and 2) I don't remember any of them > requiring > >>> the > >>> > > > > batches to be deterministic, and I'm having a hard time > imagining > >>> > what > >>> > > > kind > >>> > > > > of storage system would be able to deduplicate if batches were > >>> > > > > deterministic but wouldn't be able to deduplicate if they > weren't. > >>> > > > > > >>> > > > > On Thu, Nov 16, 2017 at 11:50 PM Romain Manni-Bucau < > >>> > > > rmannibu...@gmail.com> > >>> > > > > wrote: > >>> > > > > > >>> > > > >> Ok, let me try to step back and summarize what we have today > and > >>> > what > >>> > > I > >>> > > > >> miss: > >>> > > > >> > >>> > > > >> 1. we can handle chunking in beam through group in batch (or > >>> > > equivalent) > >>> > > > >> but: > >>> > > > >> > it is not built-in into the transforms (IO) and it is > >>> > controlled > >>> > > > >> from outside the transforms so no way for a transform to do it > >>> > > > >> properly without handling itself a composition and links > between > >>> > > > >> multiple dofns to have notifications and potentially react > >>> properly > >>> > or > >>> > > > >> handle backpressure from its backend > >>> > > > >> 2. there is no restart feature because there is no real state > >>> > handling > >>> > > > >> at the moment. this sounds fully delegated to the runner but > I was > >>> > > > >> hoping to have more guarantees from the used API to be able to > >>> > restart > >>> > > > >> a pipeline (mainly batch since it can be irrelevant or > delegates > >>> to > >>> > > > >> the backend for streams) and handle only not commited records > so > >>> it > >>> > > > >> requires some persistence outside the main IO storages to do > it > >>> > > > >> properly > >>> > > > >> > note this is somehow similar to the monitoring topic > which > >>> miss > >>> > > > >> persistence ATM so it can end up to beam to have a pluggable > >>> storage > >>> > > > >> for a few concerns > >>> > > > >> > >>> > > > >> > >>> > > > >> Short term I would be happy with 1 solved properly, long term > I > >>> hope > >>> > 2 > >>> > > > >> will be tackled without workarounds requiring custom wrapping > of > >>> IO > >>> > to > >>> > > > >> use a custom state persistence. > >>> > > > >> > >>> > > > >> > >>> > > > >> > >>> > > > >> Romain Manni-Bucau > >>> > > > >> @rmannibucau | Blog | Old Blog | Github | LinkedIn > >>> > > > >> > >>> > > > >> > >>> > > > >> 2017-11-17 7:44 GMT+01:00 Jean-Baptiste Onofré < > j...@nanthrax.net>: > >>> > > > >> > Thanks for the explanation. Agree, we might talk about > different > >>> > > > things > >>> > > > >> > using the same wording. > >>> > > > >> > > >>> > > > >> > I'm also struggling to understand the use case (for a > generic > >>> > DoFn). > >>> > > > >> > > >>> > > > >> > Regards > >>> > > > >> > JB > >>> > > > >> > > >>> > > > >> > > >>> > > > >> > On 11/17/2017 07:40 AM, Eugene Kirpichov wrote: > >>> > > > >> >> > >>> > > > >> >> To avoid spending a lot of time pursuing a false path, I'd > like > >>> > to > >>> > > > say > >>> > > > >> >> straight up that SDF is definitely not going to help here, > >>> > despite > >>> > > > the > >>> > > > >> >> fact > >>> > > > >> >> that its API includes the term "checkpoint". In SDF, the > >>> > > "checkpoint" > >>> > > > >> >> captures the state of processing within a single element. > If > >>> > you're > >>> > > > >> >> applying an SDF to 1000 elements, it will, like any other > DoFn, > >>> > be > >>> > > > >> applied > >>> > > > >> >> to each of them independently and in parallel, and you'll > have > >>> > 1000 > >>> > > > >> >> checkpoints capturing the state of processing each of these > >>> > > elements, > >>> > > > >> >> which > >>> > > > >> >> is probably not what you want. > >>> > > > >> >> > >>> > > > >> >> I'm afraid I still don't understand what kind of > checkpoint you > >>> > > > need, if > >>> > > > >> >> it > >>> > > > >> >> is not just deterministic grouping into batches. > "Checkpoint" > >>> is > >>> > a > >>> > > > very > >>> > > > >> >> broad term and it's very possible that everybody in this > thread > >>> > is > >>> > > > >> talking > >>> > > > >> >> about different things when saying it. So it would help if > you > >>> > > could > >>> > > > >> give > >>> > > > >> >> a > >>> > > > >> >> more concrete example: for example, take some IO that you > think > >>> > > > could be > >>> > > > >> >> easier to write with your proposed API, give the contents > of a > >>> > > > >> >> hypothetical > >>> > > > >> >> PCollection being written to this IO, give the code of a > >>> > > hypothetical > >>> > > > >> DoFn > >>> > > > >> >> implementing the write using your API, and explain what > you'd > >>> > > expect > >>> > > > to > >>> > > > >> >> happen at runtime. > >>> > > > >> >> > >>> > > > >> >> On Thu, Nov 16, 2017 at 10:33 PM Romain Manni-Bucau > >>> > > > >> >> <rmannibu...@gmail.com> > >>> > > > >> >> wrote: > >>> > > > >> >> > >>> > > > >> >>> @Eugene: yes and the other alternative of Reuven too but > it is > >>> > > still > >>> > > > >> >>> 1. relying on timers, 2. not really checkpointed > >>> > > > >> >>> > >>> > > > >> >>> In other words it seems all solutions are to create a > chunk of > >>> > > size > >>> > > > 1 > >>> > > > >> >>> and replayable to fake the lack of chunking in the > framework. > >>> > This > >>> > > > >> >>> always implies a chunk handling outside the component > >>> (typically > >>> > > > >> >>> before for an output). My point is I think IO need it in > their > >>> > own > >>> > > > >> >>> "internal" or at least control it themselves since the > chunk > >>> > size > >>> > > is > >>> > > > >> >>> part of the IO handling most of the time. > >>> > > > >> >>> > >>> > > > >> >>> I think JB spoke of the same "group before" trick using > >>> > > restrictions > >>> > > > >> >>> which can work I have to admit if SDF are implemented by > >>> > runners. > >>> > > Is > >>> > > > >> >>> there a roadmap/status on that? Last time I checked SDF > was a > >>> > > great > >>> > > > >> >>> API without support :(. > >>> > > > >> >>> > >>> > > > >> >>> > >>> > > > >> >>> > >>> > > > >> >>> Romain Manni-Bucau > >>> > > > >> >>> @rmannibucau | Blog | Old Blog | Github | LinkedIn > >>> > > > >> >>> > >>> > > > >> >>> > >>> > > > >> >>> 2017-11-17 7:25 GMT+01:00 Eugene Kirpichov > >>> > > > >> >>> <kirpic...@google.com.invalid>: > >>> > > > >> >>>> > >>> > > > >> >>>> JB, not sure what you mean? SDFs and triggers are > unrelated, > >>> > and > >>> > > > the > >>> > > > >> >>>> post > >>> > > > >> >>>> doesn't mention the word. Did you mean something else, > e.g. > >>> > > > >> restriction > >>> > > > >> >>>> perhaps? Either way I don't think SDFs are the solution > here; > >>> > > SDFs > >>> > > > >> have > >>> > > > >> >>> > >>> > > > >> >>> to > >>> > > > >> >>>> > >>> > > > >> >>>> do with the ability to split the processing of *a single > >>> > element* > >>> > > > over > >>> > > > >> >>>> multiple calls, whereas Romain I think is asking for > >>> repeatable > >>> > > > >> grouping > >>> > > > >> >>> > >>> > > > >> >>> of > >>> > > > >> >>>> > >>> > > > >> >>>> *multiple* elements. > >>> > > > >> >>>> > >>> > > > >> >>>> Romain - does > >>> > > > >> >>>> > >>> > > > >> >>> > >>> > > > >> >>> > >>> > > > >> https://github.com/apache/beam/blob/master/sdks/java/ > >>> > > > core/src/main/java/org/apache/beam/sdk/transforms/ > >>> GroupIntoBatches.java > >>> > > > >> >>>> > >>> > > > >> >>>> do what > >>> > > > >> >>>> you want? > >>> > > > >> >>>> > >>> > > > >> >>>> On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste Onofré < > >>> > > > >> j...@nanthrax.net> > >>> > > > >> >>>> wrote: > >>> > > > >> >>>> > >>> > > > >> >>>>> It sounds like the "Trigger" in the Splittable DoFn, no > ? > >>> > > > >> >>>>> > >>> > > > >> >>>>> https://beam.apache.org/blog/ > 2017/08/16/splittable-do-fn. > >>> html > >>> > > > >> >>>>> > >>> > > > >> >>>>> Regards > >>> > > > >> >>>>> JB > >>> > > > >> >>>>> > >>> > > > >> >>>>> > >>> > > > >> >>>>> On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote: > >>> > > > >> >>>>>> > >>> > > > >> >>>>>> it gives the fn/transform the ability to save a state > - it > >>> > can > >>> > > > get > >>> > > > >> >>>>>> back on "restart" / whatever unit we can use, probably > >>> runner > >>> > > > >> >>>>>> dependent? Without that you need to rewrite all IO > usage > >>> with > >>> > > > >> >>>>>> something like the previous pattern which makes the IO > not > >>> > self > >>> > > > >> >>>>>> sufficient and kind of makes the entry cost and usage > of > >>> beam > >>> > > way > >>> > > > >> >>>>>> further. > >>> > > > >> >>>>>> > >>> > > > >> >>>>>> In my mind it is exactly what jbatch/spring-batch uses > but > >>> > > > adapted > >>> > > > >> to > >>> > > > >> >>>>>> beam (stream in particular) case. > >>> > > > >> >>>>>> > >>> > > > >> >>>>>> Romain Manni-Bucau > >>> > > > >> >>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn > >>> > > > >> >>>>>> > >>> > > > >> >>>>>> > >>> > > > >> >>>>>> 2017-11-17 6:49 GMT+01:00 Reuven Lax > >>> > <re...@google.com.invalid > >>> > > >: > >>> > > > >> >>>>>>> > >>> > > > >> >>>>>>> Romain, > >>> > > > >> >>>>>>> > >>> > > > >> >>>>>>> Can you define what you mean by checkpoint? What are > the > >>> > > > semantics, > >>> > > > >> >>> > >>> > > > >> >>> what > >>> > > > >> >>>>>>> > >>> > > > >> >>>>>>> does it accomplish? > >>> > > > >> >>>>>>> > >>> > > > >> >>>>>>> Reuven > >>> > > > >> >>>>>>> > >>> > > > >> >>>>>>> On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau < > >>> > > > >> >>>>> > >>> > > > >> >>>>> rmannibu...@gmail.com> > >>> > > > >> >>>>>>> > >>> > > > >> >>>>>>> wrote: > >>> > > > >> >>>>>>> > >>> > > > >> >>>>>>>> Yes, what I propose earlier was: > >>> > > > >> >>>>>>>> > >>> > > > >> >>>>>>>> I. checkpoint marker: > >>> > > > >> >>>>>>>> > >>> > > > >> >>>>>>>> @AnyBeamAnnotation > >>> > > > >> >>>>>>>> @CheckpointAfter > >>> > > > >> >>>>>>>> public void someHook(SomeContext ctx); > >>> > > > >> >>>>>>>> > >>> > > > >> >>>>>>>> > >>> > > > >> >>>>>>>> II. pipeline.apply(ParDo.of(new > >>> > > > >> MyFn()).withCheckpointAlgorithm(new > >>> > > > >> >>>>>>>> CountingAlgo())) > >>> > > > >> >>>>>>>> > >>> > > > >> >>>>>>>> III. (I like this one less) > >>> > > > >> >>>>>>>> > >>> > > > >> >>>>>>>> // in the dofn > >>> > > > >> >>>>>>>> @CheckpointTester > >>> > > > >> >>>>>>>> public boolean shouldCheckpoint(); > >>> > > > >> >>>>>>>> > >>> > > > >> >>>>>>>> IV. @Checkpointer Serializable getCheckpoint(); in > the > >>> dofn > >>> > > per > >>> > > > >> >>> > >>> > > > >> >>> element > >>> > > > >> >>>>>>>> > >>> > > > >> >>>>>>>> > >>> > > > >> >>>>>>>> > >>> > > > >> >>>>>>>> > >>> > > > >> >>>>>>>> > >>> > > > >> >>>>>>>> Romain Manni-Bucau > >>> > > > >> >>>>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn > >>> > > > >> >>>>>>>> > >>> > > > >> >>>>>>>> > >>> > > > >> >>>>>>>> 2017-11-17 6:06 GMT+01:00 Raghu Angadi > >>> > > > <rang...@google.com.invalid > >>> > > > >> >>>> > >>> > > > >> >>>> : > >>> > > > >> >>>>>>>>> > >>> > > > >> >>>>>>>>> How would you define it (rough API is fine)?. > Without > >>> more > >>> > > > >> details, > >>> > > > >> >>>>> > >>> > > > >> >>>>> it is > >>> > > > >> >>>>>>>>> > >>> > > > >> >>>>>>>>> not easy to see wider applicability and feasibility > in > >>> > > > runners. > >>> > > > >> >>>>>>>>> > >>> > > > >> >>>>>>>>> On Thu, Nov 16, 2017 at 1:13 PM, Romain Manni-Bucau > < > >>> > > > >> >>>>>>>> > >>> > > > >> >>>>>>>> rmannibu...@gmail.com> > >>> > > > >> >>>>>>>>> > >>> > > > >> >>>>>>>>> wrote: > >>> > > > >> >>>>>>>>> > >>> > > > >> >>>>>>>>>> This is a fair summary of the current state but > also > >>> > where > >>> > > > beam > >>> > > > >> >>> > >>> > > > >> >>> can > >>> > > > >> >>>>>>>> > >>> > > > >> >>>>>>>> have a > >>> > > > >> >>>>>>>>>> > >>> > > > >> >>>>>>>>>> very strong added value and make big data great and > >>> > smooth. > >>> > > > >> >>>>>>>>>> > >>> > > > >> >>>>>>>>>> Instead of this replay feature isnt checkpointing > >>> > willable? > >>> > > > In > >>> > > > >> >>>>>>>> > >>> > > > >> >>>>>>>> particular > >>> > > > >> >>>>>>>>>> > >>> > > > >> >>>>>>>>>> with SDF no? > >>> > > > >> >>>>>>>>>> > >>> > > > >> >>>>>>>>>> > >>> > > > >> >>>>>>>>>> Le 16 nov. 2017 19:50, "Raghu Angadi" > >>> > > > >> <rang...@google.com.invalid> > >>> > > > >> >>> > >>> > > > >> >>> a > >>> > > > >> >>>>>>>>>> > >>> > > > >> >>>>>>>>>> écrit : > >>> > > > >> >>>>>>>>>> > >>> > > > >> >>>>>>>>>>> Core issue here is that there is no explicit > concept > >>> of > >>> > > > >> >>> > >>> > > > >> >>> 'checkpoint' > >>> > > > >> >>>>>>>> > >>> > > > >> >>>>>>>> in > >>> > > > >> >>>>>>>>>>> > >>> > > > >> >>>>>>>>>>> Beam (UnboundedSource has a method > 'getCheckpointMark' > >>> > but > >>> > > > that > >>> > > > >> >>>>>>>> > >>> > > > >> >>>>>>>> refers to > >>> > > > >> >>>>>>>>>>> > >>> > > > >> >>>>>>>>>>> the checkoint on external source). Runners do > >>> checkpoint > >>> > > > >> >>> > >>> > > > >> >>> internally > >>> > > > >> >>>>> > >>> > > > >> >>>>> as > >>> > > > >> >>>>>>>>>>> > >>> > > > >> >>>>>>>>>>> implementation detail. Flink's checkpoint model is > >>> > > entirely > >>> > > > >> >>>>> > >>> > > > >> >>>>> different > >>> > > > >> >>>>>>>>>> > >>> > > > >> >>>>>>>>>> from > >>> > > > >> >>>>>>>>>>> > >>> > > > >> >>>>>>>>>>> Dataflow's and Spark's. > >>> > > > >> >>>>>>>>>>> > >>> > > > >> >>>>>>>>>>> @StableReplay helps, but it does not explicitly > talk > >>> > about > >>> > > a > >>> > > > >> >>>>>>>> > >>> > > > >> >>>>>>>> checkpoint > >>> > > > >> >>>>>>>>>> > >>> > > > >> >>>>>>>>>> by > >>> > > > >> >>>>>>>>>>> > >>> > > > >> >>>>>>>>>>> design. > >>> > > > >> >>>>>>>>>>> > >>> > > > >> >>>>>>>>>>> If you are looking to achieve some guarantees > with a > >>> > > > >> sink/DoFn, I > >>> > > > >> >>>>>>>> > >>> > > > >> >>>>>>>> think > >>> > > > >> >>>>>>>>>> > >>> > > > >> >>>>>>>>>> it > >>> > > > >> >>>>>>>>>>> > >>> > > > >> >>>>>>>>>>> is better to start with the requirements. I > worked on > >>> > > > >> >>> > >>> > > > >> >>> exactly-once > >>> > > > >> >>>>>>>> > >>> > > > >> >>>>>>>> sink > >>> > > > >> >>>>>>>>>> > >>> > > > >> >>>>>>>>>> for > >>> > > > >> >>>>>>>>>>> > >>> > > > >> >>>>>>>>>>> Kafka (see KafkaIO.write().withEOS()), where we > >>> > > essentially > >>> > > > >> >>> > >>> > > > >> >>> reshard > >>> > > > >> >>>>>>>> > >>> > > > >> >>>>>>>> the > >>> > > > >> >>>>>>>>>>> > >>> > > > >> >>>>>>>>>>> elements and assign sequence numbers to elements > with > >>> in > >>> > > > each > >>> > > > >> >>> > >>> > > > >> >>> shard. > >>> > > > >> >>>>>>>>>>> > >>> > > > >> >>>>>>>>>>> Duplicates in replays are avoided based on these > >>> > sequence > >>> > > > >> >>> > >>> > > > >> >>> numbers. > >>> > > > >> >>>>>>>> > >>> > > > >> >>>>>>>> DoFn > >>> > > > >> >>>>>>>>>>> > >>> > > > >> >>>>>>>>>>> state API is used to buffer out-of order replays. > The > >>> > > > >> >>> > >>> > > > >> >>> implementation > >>> > > > >> >>>>>>>>>>> > >>> > > > >> >>>>>>>>>>> strategy works in Dataflow but not in Flink which > has > >>> a > >>> > > > >> >>> > >>> > > > >> >>> horizontal > >>> > > > >> >>>>>>>>>>> > >>> > > > >> >>>>>>>>>>> checkpoint. KafkaIO checks for compatibility. > >>> > > > >> >>>>>>>>>>> > >>> > > > >> >>>>>>>>>>> On Wed, Nov 15, 2017 at 12:38 AM, Romain > Manni-Bucau < > >>> > > > >> >>>>>>>>>>> rmannibu...@gmail.com> > >>> > > > >> >>>>>>>>>>> wrote: > >>> > > > >> >>>>>>>>>>> > >>> > > > >> >>>>>>>>>>>> Hi guys, > >>> > > > >> >>>>>>>>>>>> > >>> > > > >> >>>>>>>>>>>> The subject is a bit provocative but the topic is > >>> real > >>> > > and > >>> > > > >> >>> > >>> > > > >> >>> coming > >>> > > > >> >>>>>>>>>>>> > >>> > > > >> >>>>>>>>>>>> again and again with the beam usage: how a dofn > can > >>> > > handle > >>> > > > >> some > >>> > > > >> >>>>>>>>>>>> "chunking". > >>> > > > >> >>>>>>>>>>>> > >>> > > > >> >>>>>>>>>>>> The need is to be able to commit each N records > but > >>> > with > >>> > > N > >>> > > > not > >>> > > > >> >>> > >>> > > > >> >>> too > >>> > > > >> >>>>>>>> > >>> > > > >> >>>>>>>> big. > >>> > > > >> >>>>>>>>>>>> > >>> > > > >> >>>>>>>>>>>> > >>> > > > >> >>>>>>>>>>>> The natural API for that in beam is the bundle > one > >>> but > >>> > > > bundles > >>> > > > >> >>> > >>> > > > >> >>> are > >>> > > > >> >>>>>>>> > >>> > > > >> >>>>>>>> not > >>> > > > >> >>>>>>>>>>>> > >>> > > > >> >>>>>>>>>>>> reliable since they can be very small (flink) - > we > >>> can > >>> > > say > >>> > > > it > >>> > > > >> is > >>> > > > >> >>>>>>>> > >>> > > > >> >>>>>>>> "ok" > >>> > > > >> >>>>>>>>>>>> > >>> > > > >> >>>>>>>>>>>> even if it has some perf impacts - or too big > (spark > >>> > does > >>> > > > full > >>> > > > >> >>> > >>> > > > >> >>> size > >>> > > > >> >>>>>>>> > >>> > > > >> >>>>>>>> / > >>> > > > >> >>>>>>>>>>>> > >>> > > > >> >>>>>>>>>>>> #workers). > >>> > > > >> >>>>>>>>>>>> > >>> > > > >> >>>>>>>>>>>> The workaround is what we see in the ES I/O: a > >>> maxSize > >>> > > > which > >>> > > > >> >>> > >>> > > > >> >>> does > >>> > > > >> >>>>> > >>> > > > >> >>>>> an > >>> > > > >> >>>>>>>>>>>> > >>> > > > >> >>>>>>>>>>>> eager flush. The issue is that then the > checkpoint is > >>> > not > >>> > > > >> >>> > >>> > > > >> >>> respected > >>> > > > >> >>>>>>>>>>>> > >>> > > > >> >>>>>>>>>>>> and you can process multiple times the same > records. > >>> > > > >> >>>>>>>>>>>> > >>> > > > >> >>>>>>>>>>>> Any plan to make this API reliable and > controllable > >>> > from > >>> > > a > >>> > > > >> beam > >>> > > > >> >>>>>>>> > >>> > > > >> >>>>>>>> point > >>> > > > >> >>>>>>>>>>>> > >>> > > > >> >>>>>>>>>>>> of view (at least in a max manner)? > >>> > > > >> >>>>>>>>>>>> > >>> > > > >> >>>>>>>>>>>> Thanks, > >>> > > > >> >>>>>>>>>>>> Romain Manni-Bucau > >>> > > > >> >>>>>>>>>>>> @rmannibucau | Blog | Old Blog | Github | > LinkedIn > >>> > > > >> >>>>>>>>>>>> > >>> > > > >> >>>>>>>>>>> > >>> > > > >> >>>>>>>>>> > >>> > > > >> >>>>>>>> > >>> > > > >> >>>>> > >>> > > > >> >>>>> -- > >>> > > > >> >>>>> Jean-Baptiste Onofré > >>> > > > >> >>>>> jbono...@apache.org > >>> > > > >> >>>>> http://blog.nanthrax.net > >>> > > > >> >>>>> Talend - http://www.talend.com > >>> > > > >> >>>>> > >>> > > > >> >>> > >>> > > > >> >> > >>> > > > >> > > >>> > > > >> > -- > >>> > > > >> > Jean-Baptiste Onofré > >>> > > > >> > jbono...@apache.org > >>> > > > >> > http://blog.nanthrax.net > >>> > > > >> > Talend - http://www.talend.com > >>> > > > >> > >>> > > > > >>> > > > >>> > > >>> >