Guys, what about moving getMaxBundleSize from flink options to pipeline options. I think all runners can support it right? Spark code needs the merge of the v2 before being able to be implemented probably but I don't see any blocker.
wdyt? Romain Manni-Bucau @rmannibucau | Blog | Old Blog | Github | LinkedIn 2017-11-19 8:19 GMT+01:00 Romain Manni-Bucau <[email protected]>: > @Eugene: "workaround" as specific to the IO each time and therefore > still highlight a lack in the core. > > Other comments inline > > > 2017-11-19 7:40 GMT+01:00 Robert Bradshaw <[email protected]>: >> There is a possible fourth issue that we don't handle well: efficiency. For >> very large bundles, it may be advantageous to avoid replaying a bunch of >> idempotent operations if there were a way to record what ones we're sure >> went through. Not sure if that's the issue here (though one could possibly >> do this with SDFs, one can preemptively returning periodically before an >> element (or portion thereof) is done). > > +1, also lead to the IO handling its own chunking/bundles and > therefore solves all issues at once. > >> >> On Sat, Nov 18, 2017 at 6:58 PM, Eugene Kirpichov < >> [email protected]> wrote: >> >>> I disagree that the usage of document id in ES is a "workaround" - it does >>> not address any *accidental *complexity >>> <https://en.wikipedia.org/wiki/No_Silver_Bullet> coming from shortcomings >>> of Beam, it addresses the *essential* complexity that a distributed system >>> forces one to take it as a fact of nature that the same write >>> (mutation) will happen multiple times, so if you want a mutation to happen >>> "as-if" it happened exactly once, the mutation itself must be idempotent >>> <https://en.wikipedia.org/wiki/Idempotence>. Insert-with-id (upsert >>> <https://en.wikipedia.org/wiki/Merge_(SQL)>) is a classic example of an >>> idempotent mutation, and it's very good that Elasticsearch provides it - if >>> it didn't, no matter how good of an API Beam had, achieving exactly-once >>> writes would be theoretically impossible. Are we in agreement on this so >>> far? >>> >>> Next: you seem to be discussing 3 issues together, all of which are valid >>> issues, but they seem unrelated to me: >>> 1. Exactly-once mutation >>> 2. Batching multiple mutations into one RPC. >>> 3. Backpressure >>> >>> #1: was considered above. The system the IO is talking to has to support >>> idempotent mutations, in an IO-specific way, and the IO has to take >>> advantage of them, in the IO-specific way - end of story. > > Agree but don't forget the original point was about "chunks" and not > individual records. > >>> >>> #2: a batch of idempotent operations is also idempotent, so this doesn't >>> add anything new semantically. Syntactically - Beam already allows you to >>> write your own batching by notifying you of permitted batch boundaries >>> (Start/FinishBundle). Sure, it could do more, but from my experience the >>> batching in IOs I've seen is one of the easiest and least error-prone >>> parts, so I don't see something worth an extended discussion here. > > "Beam already allows you to > write your own batching by notifying you of permitted batch boundaries > (Start/FinishBundle)" > > Is wrong since the bundle is potentially the whole PCollection (spark) > so this is not even an option until you use the SDF (back to the same > point). > Once again the API looks fine but no implementation makes it true. It > would be easy to change it in spark, flink can be ok since it targets > more the streaming case, not sure of others, any idea? > > >>> >>> #3: handling backpressure is a complex problem with multiple facets: 1) how >>> do you know you're being throttled, and by how much are you exceeding the >>> external system's capacity? > > This is the whole point of backpressure, the system sends it back to > you (header like or status technic in general) > >>> 2) how do you communicate this signal to the >>> runner? > > You are a client so you get the meta in the response - whatever techno. > >>> 3) what does the runner do in response? > > Runner nothing but the IO adapts its handling as mentionned before > (wait and retry, skip, ... depending the config) > >>> 4) how do you wait until >>> it's ok to try again? > > This is one point to probably enhance in beam but waiting in the > processing is an option if the source has some buffering otherwise it > requires to have a buffer fallback and max size if the wait mode is > activated. > >>> You seem to be advocating for solving one facet of this problem, which is: >>> you want it to be possible to signal to the runner "I'm being throttled, >>> please end the bundle", right? If so - I think this (ending the bundle) is >>> unnecessary: the DoFn can simply do an exponential back-off sleep loop. > > Agree, never said the runner should know but GBK+output doesnt work > cause you dont own the GBK. > >>> This is e.g. what DatastoreIO does >>> <https://github.com/apache/beam/blob/master/sdks/java/io/ >>> google-cloud-platform/src/main/java/org/apache/beam/sdk/ >>> io/gcp/datastore/DatastoreV1.java#L1318> >>> and >>> this is in general how most systems I've seen handle backpressure. Is there >>> something I'm missing? In particular, is there any compelling reason why >>> you think it'd be beneficial e.g. for DatastoreIO to commit the results of >>> the bundle so far before processing other elements? > > It was more about ensuring you validate early a subset of the whole > bundle and avoid to reprocess it if it fails later. > > > So to summarize I see 2 outcomes: > > 1. impl SDF in all runners > 2. make the bundle size upper bounded - through a pipeline option - in > all runners, not sure this one is doable everywhere since I mainly > checked spark case > >>> >>> Again, it might be that I'm still misunderstanding what you're trying to >>> say. One of the things it would help to clarify would be - exactly what do >>> you mean by "how batch frameworks solved that for years": can you point at >>> an existing API in some other framework that achieves what you want? >>> >>> On Sat, Nov 18, 2017 at 1:02 PM Romain Manni-Bucau <[email protected]> >>> wrote: >>> >>> > Eugene, point - and issue with a single sample - is you can always find >>> > *workarounds* on a case by case basis as the id one with ES but beam >>> doesnt >>> > solve the problem as a framework. >>> > >>> > From my past, I clearly dont see how batch frameworks solved that for >>> years >>> > and beam is not able to do it - keep in mind it is the same kind of >>> techno, >>> > it just uses different sources and bigger clusters so no real reason to >>> not >>> > have the same feature quality. The only potential reason i see is there >>> is >>> > no tracking of the state into the cluster - e2e. But i dont see why there >>> > wouldnt be. Do I miss something here? >>> > >>> > An example could be: take a github crawler computing stats on the whole >>> > girhub repos which is based on a rest client as example. You will need to >>> > handle the rate limit and likely want to "commit" each time you reach a >>> > rate limit with likely some buffering strategy with a max size before >>> > really waiting. How do you do it with a GBK independent of your dofn? You >>> > are not able to compose correctly the fn between them :(. >>> > >>> > >>> > Le 18 nov. 2017 20:48, "Eugene Kirpichov" <[email protected]> >>> a >>> > écrit : >>> > >>> > After giving this thread my best attempt at understanding exactly what is >>> > the problem and the proposed solution, I'm afraid I still fail to >>> > understand both. To reiterate, I think the only way to make progress here >>> > is to be more concrete: (quote) take some IO that you think could be >>> easier >>> > to write with your proposed API, give the contents of a hypothetical >>> > PCollection being written to this IO, give the code of a hypothetical >>> DoFn >>> > implementing the write using your API, and explain what you'd expect to >>> > happen at runtime. I'm going to re-engage in this thread after such an >>> > example is given. >>> > >>> > On Sat, Nov 18, 2017, 5:00 AM Romain Manni-Bucau <[email protected]> >>> > wrote: >>> > >>> > > First bundle retry is unusable with dome runners like spark where the >>> > > bundle size is the collection size / number of work. This means a user >>> > cant >>> > > use bundle API or feature reliably and portably - which is beam >>> promise. >>> > > Aligning chunking and bundles would guarantee that bit can be not >>> > desired, >>> > > that is why i thought it can be another feature. >>> > > >>> > > GBK works until the IO knows about that and both concepts are not >>> always >>> > > orthogonal - backpressure like systems is a trivial common example. >>> This >>> > > means the IO (dofn) must be able to do it itself at some point. >>> > > >>> > > Also note the GBK works only if the IO can take a list which is never >>> the >>> > > case today. >>> > > >>> > > Big questions for me are: is SDF the way to go since it provides the >>> > needed >>> > > API bit is not yet supported? What about existing IO? Should beam >>> provide >>> > > an auto wrapping of dofn for that pre-aggregated support and simulate >>> > > bundles to the actual IO impl to keep the existing API? >>> > > >>> > > >>> > > Le 17 nov. 2017 19:20, "Raghu Angadi" <[email protected]> a >>> > > écrit : >>> > > >>> > > On Fri, Nov 17, 2017 at 1:02 AM, Romain Manni-Bucau < >>> > [email protected] >>> > > > >>> > > wrote: >>> > > >>> > > > Yep, just take ES IO, if a part of a bundle fails you are in an >>> > > > unmanaged state. This is the case for all O (of IO ;)). Issue is not >>> > > > much about "1" (the code it takes) but more the fact it doesn't >>> > > > integrate with runner features and retries potentially: what happens >>> > > > if a bundle has a failure? => undefined today. 2. I'm fine with it >>> > > > while we know exactly what happens when we restart after a bundle >>> > > > failure. With ES the timestamp can be used for instance. >>> > > > >>> > > >>> > > This deterministic batching can be achieved even now with an extra >>> > > GroupByKey (and if you want ordering on top of that, will need another >>> > > GBK). Don't know if that is too costly in your case. I would need bit >>> > more >>> > > details on handling ES IO write retries to see it could be simplified. >>> > Note >>> > > that retries occur with or without any failures in your DoFn. >>> > > >>> > > The biggest negative with GBK approach is that it doesn't provide same >>> > > guarantees on Flink. >>> > > >>> > > I don't see how GroubIntoBatches in Beam provides specific guarantees >>> on >>> > > deterministic batches. >>> > > >>> > > Thinking about it the SDF is really a way to do it since the SDF will >>> > > > manage the bulking and associated with the runner "retry" it seems it >>> > > > covers the needs. >>> > > > >>> > > > Romain Manni-Bucau >>> > > > @rmannibucau | Blog | Old Blog | Github | LinkedIn >>> > > > >>> > > > >>> > > > 2017-11-17 9:23 GMT+01:00 Eugene Kirpichov >>> > <[email protected] >>> > > >: >>> > > > > I must admit I'm still failing to understand the problem, so let's >>> > step >>> > > > > back even further. >>> > > > > >>> > > > > Could you give an example of an IO that is currently difficult to >>> > > > implement >>> > > > > specifically because of lack of the feature you're talking about? >>> > > > > >>> > > > > I'm asking because I've reviewed almost all Beam IOs and don't >>> recall >>> > > > > seeing a similar problem. Sure, a lot of IOs do batching within a >>> > > bundle, >>> > > > > but 1) it doesn't take up much code (granted, it would be even >>> easier >>> > > if >>> > > > > Beam did it for us) and 2) I don't remember any of them requiring >>> the >>> > > > > batches to be deterministic, and I'm having a hard time imagining >>> > what >>> > > > kind >>> > > > > of storage system would be able to deduplicate if batches were >>> > > > > deterministic but wouldn't be able to deduplicate if they weren't. >>> > > > > >>> > > > > On Thu, Nov 16, 2017 at 11:50 PM Romain Manni-Bucau < >>> > > > [email protected]> >>> > > > > wrote: >>> > > > > >>> > > > >> Ok, let me try to step back and summarize what we have today and >>> > what >>> > > I >>> > > > >> miss: >>> > > > >> >>> > > > >> 1. we can handle chunking in beam through group in batch (or >>> > > equivalent) >>> > > > >> but: >>> > > > >> > it is not built-in into the transforms (IO) and it is >>> > controlled >>> > > > >> from outside the transforms so no way for a transform to do it >>> > > > >> properly without handling itself a composition and links between >>> > > > >> multiple dofns to have notifications and potentially react >>> properly >>> > or >>> > > > >> handle backpressure from its backend >>> > > > >> 2. there is no restart feature because there is no real state >>> > handling >>> > > > >> at the moment. this sounds fully delegated to the runner but I was >>> > > > >> hoping to have more guarantees from the used API to be able to >>> > restart >>> > > > >> a pipeline (mainly batch since it can be irrelevant or delegates >>> to >>> > > > >> the backend for streams) and handle only not commited records so >>> it >>> > > > >> requires some persistence outside the main IO storages to do it >>> > > > >> properly >>> > > > >> > note this is somehow similar to the monitoring topic which >>> miss >>> > > > >> persistence ATM so it can end up to beam to have a pluggable >>> storage >>> > > > >> for a few concerns >>> > > > >> >>> > > > >> >>> > > > >> Short term I would be happy with 1 solved properly, long term I >>> hope >>> > 2 >>> > > > >> will be tackled without workarounds requiring custom wrapping of >>> IO >>> > to >>> > > > >> use a custom state persistence. >>> > > > >> >>> > > > >> >>> > > > >> >>> > > > >> Romain Manni-Bucau >>> > > > >> @rmannibucau | Blog | Old Blog | Github | LinkedIn >>> > > > >> >>> > > > >> >>> > > > >> 2017-11-17 7:44 GMT+01:00 Jean-Baptiste Onofré <[email protected]>: >>> > > > >> > Thanks for the explanation. Agree, we might talk about different >>> > > > things >>> > > > >> > using the same wording. >>> > > > >> > >>> > > > >> > I'm also struggling to understand the use case (for a generic >>> > DoFn). >>> > > > >> > >>> > > > >> > Regards >>> > > > >> > JB >>> > > > >> > >>> > > > >> > >>> > > > >> > On 11/17/2017 07:40 AM, Eugene Kirpichov wrote: >>> > > > >> >> >>> > > > >> >> To avoid spending a lot of time pursuing a false path, I'd like >>> > to >>> > > > say >>> > > > >> >> straight up that SDF is definitely not going to help here, >>> > despite >>> > > > the >>> > > > >> >> fact >>> > > > >> >> that its API includes the term "checkpoint". In SDF, the >>> > > "checkpoint" >>> > > > >> >> captures the state of processing within a single element. If >>> > you're >>> > > > >> >> applying an SDF to 1000 elements, it will, like any other DoFn, >>> > be >>> > > > >> applied >>> > > > >> >> to each of them independently and in parallel, and you'll have >>> > 1000 >>> > > > >> >> checkpoints capturing the state of processing each of these >>> > > elements, >>> > > > >> >> which >>> > > > >> >> is probably not what you want. >>> > > > >> >> >>> > > > >> >> I'm afraid I still don't understand what kind of checkpoint you >>> > > > need, if >>> > > > >> >> it >>> > > > >> >> is not just deterministic grouping into batches. "Checkpoint" >>> is >>> > a >>> > > > very >>> > > > >> >> broad term and it's very possible that everybody in this thread >>> > is >>> > > > >> talking >>> > > > >> >> about different things when saying it. So it would help if you >>> > > could >>> > > > >> give >>> > > > >> >> a >>> > > > >> >> more concrete example: for example, take some IO that you think >>> > > > could be >>> > > > >> >> easier to write with your proposed API, give the contents of a >>> > > > >> >> hypothetical >>> > > > >> >> PCollection being written to this IO, give the code of a >>> > > hypothetical >>> > > > >> DoFn >>> > > > >> >> implementing the write using your API, and explain what you'd >>> > > expect >>> > > > to >>> > > > >> >> happen at runtime. >>> > > > >> >> >>> > > > >> >> On Thu, Nov 16, 2017 at 10:33 PM Romain Manni-Bucau >>> > > > >> >> <[email protected]> >>> > > > >> >> wrote: >>> > > > >> >> >>> > > > >> >>> @Eugene: yes and the other alternative of Reuven too but it is >>> > > still >>> > > > >> >>> 1. relying on timers, 2. not really checkpointed >>> > > > >> >>> >>> > > > >> >>> In other words it seems all solutions are to create a chunk of >>> > > size >>> > > > 1 >>> > > > >> >>> and replayable to fake the lack of chunking in the framework. >>> > This >>> > > > >> >>> always implies a chunk handling outside the component >>> (typically >>> > > > >> >>> before for an output). My point is I think IO need it in their >>> > own >>> > > > >> >>> "internal" or at least control it themselves since the chunk >>> > size >>> > > is >>> > > > >> >>> part of the IO handling most of the time. >>> > > > >> >>> >>> > > > >> >>> I think JB spoke of the same "group before" trick using >>> > > restrictions >>> > > > >> >>> which can work I have to admit if SDF are implemented by >>> > runners. >>> > > Is >>> > > > >> >>> there a roadmap/status on that? Last time I checked SDF was a >>> > > great >>> > > > >> >>> API without support :(. >>> > > > >> >>> >>> > > > >> >>> >>> > > > >> >>> >>> > > > >> >>> Romain Manni-Bucau >>> > > > >> >>> @rmannibucau | Blog | Old Blog | Github | LinkedIn >>> > > > >> >>> >>> > > > >> >>> >>> > > > >> >>> 2017-11-17 7:25 GMT+01:00 Eugene Kirpichov >>> > > > >> >>> <[email protected]>: >>> > > > >> >>>> >>> > > > >> >>>> JB, not sure what you mean? SDFs and triggers are unrelated, >>> > and >>> > > > the >>> > > > >> >>>> post >>> > > > >> >>>> doesn't mention the word. Did you mean something else, e.g. >>> > > > >> restriction >>> > > > >> >>>> perhaps? Either way I don't think SDFs are the solution here; >>> > > SDFs >>> > > > >> have >>> > > > >> >>> >>> > > > >> >>> to >>> > > > >> >>>> >>> > > > >> >>>> do with the ability to split the processing of *a single >>> > element* >>> > > > over >>> > > > >> >>>> multiple calls, whereas Romain I think is asking for >>> repeatable >>> > > > >> grouping >>> > > > >> >>> >>> > > > >> >>> of >>> > > > >> >>>> >>> > > > >> >>>> *multiple* elements. >>> > > > >> >>>> >>> > > > >> >>>> Romain - does >>> > > > >> >>>> >>> > > > >> >>> >>> > > > >> >>> >>> > > > >> https://github.com/apache/beam/blob/master/sdks/java/ >>> > > > core/src/main/java/org/apache/beam/sdk/transforms/ >>> GroupIntoBatches.java >>> > > > >> >>>> >>> > > > >> >>>> do what >>> > > > >> >>>> you want? >>> > > > >> >>>> >>> > > > >> >>>> On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste Onofré < >>> > > > >> [email protected]> >>> > > > >> >>>> wrote: >>> > > > >> >>>> >>> > > > >> >>>>> It sounds like the "Trigger" in the Splittable DoFn, no ? >>> > > > >> >>>>> >>> > > > >> >>>>> https://beam.apache.org/blog/2017/08/16/splittable-do-fn. >>> html >>> > > > >> >>>>> >>> > > > >> >>>>> Regards >>> > > > >> >>>>> JB >>> > > > >> >>>>> >>> > > > >> >>>>> >>> > > > >> >>>>> On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote: >>> > > > >> >>>>>> >>> > > > >> >>>>>> it gives the fn/transform the ability to save a state - it >>> > can >>> > > > get >>> > > > >> >>>>>> back on "restart" / whatever unit we can use, probably >>> runner >>> > > > >> >>>>>> dependent? Without that you need to rewrite all IO usage >>> with >>> > > > >> >>>>>> something like the previous pattern which makes the IO not >>> > self >>> > > > >> >>>>>> sufficient and kind of makes the entry cost and usage of >>> beam >>> > > way >>> > > > >> >>>>>> further. >>> > > > >> >>>>>> >>> > > > >> >>>>>> In my mind it is exactly what jbatch/spring-batch uses but >>> > > > adapted >>> > > > >> to >>> > > > >> >>>>>> beam (stream in particular) case. >>> > > > >> >>>>>> >>> > > > >> >>>>>> Romain Manni-Bucau >>> > > > >> >>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn >>> > > > >> >>>>>> >>> > > > >> >>>>>> >>> > > > >> >>>>>> 2017-11-17 6:49 GMT+01:00 Reuven Lax >>> > <[email protected] >>> > > >: >>> > > > >> >>>>>>> >>> > > > >> >>>>>>> Romain, >>> > > > >> >>>>>>> >>> > > > >> >>>>>>> Can you define what you mean by checkpoint? What are the >>> > > > semantics, >>> > > > >> >>> >>> > > > >> >>> what >>> > > > >> >>>>>>> >>> > > > >> >>>>>>> does it accomplish? >>> > > > >> >>>>>>> >>> > > > >> >>>>>>> Reuven >>> > > > >> >>>>>>> >>> > > > >> >>>>>>> On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau < >>> > > > >> >>>>> >>> > > > >> >>>>> [email protected]> >>> > > > >> >>>>>>> >>> > > > >> >>>>>>> wrote: >>> > > > >> >>>>>>> >>> > > > >> >>>>>>>> Yes, what I propose earlier was: >>> > > > >> >>>>>>>> >>> > > > >> >>>>>>>> I. checkpoint marker: >>> > > > >> >>>>>>>> >>> > > > >> >>>>>>>> @AnyBeamAnnotation >>> > > > >> >>>>>>>> @CheckpointAfter >>> > > > >> >>>>>>>> public void someHook(SomeContext ctx); >>> > > > >> >>>>>>>> >>> > > > >> >>>>>>>> >>> > > > >> >>>>>>>> II. pipeline.apply(ParDo.of(new >>> > > > >> MyFn()).withCheckpointAlgorithm(new >>> > > > >> >>>>>>>> CountingAlgo())) >>> > > > >> >>>>>>>> >>> > > > >> >>>>>>>> III. (I like this one less) >>> > > > >> >>>>>>>> >>> > > > >> >>>>>>>> // in the dofn >>> > > > >> >>>>>>>> @CheckpointTester >>> > > > >> >>>>>>>> public boolean shouldCheckpoint(); >>> > > > >> >>>>>>>> >>> > > > >> >>>>>>>> IV. @Checkpointer Serializable getCheckpoint(); in the >>> dofn >>> > > per >>> > > > >> >>> >>> > > > >> >>> element >>> > > > >> >>>>>>>> >>> > > > >> >>>>>>>> >>> > > > >> >>>>>>>> >>> > > > >> >>>>>>>> >>> > > > >> >>>>>>>> >>> > > > >> >>>>>>>> Romain Manni-Bucau >>> > > > >> >>>>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn >>> > > > >> >>>>>>>> >>> > > > >> >>>>>>>> >>> > > > >> >>>>>>>> 2017-11-17 6:06 GMT+01:00 Raghu Angadi >>> > > > <[email protected] >>> > > > >> >>>> >>> > > > >> >>>> : >>> > > > >> >>>>>>>>> >>> > > > >> >>>>>>>>> How would you define it (rough API is fine)?. Without >>> more >>> > > > >> details, >>> > > > >> >>>>> >>> > > > >> >>>>> it is >>> > > > >> >>>>>>>>> >>> > > > >> >>>>>>>>> not easy to see wider applicability and feasibility in >>> > > > runners. >>> > > > >> >>>>>>>>> >>> > > > >> >>>>>>>>> On Thu, Nov 16, 2017 at 1:13 PM, Romain Manni-Bucau < >>> > > > >> >>>>>>>> >>> > > > >> >>>>>>>> [email protected]> >>> > > > >> >>>>>>>>> >>> > > > >> >>>>>>>>> wrote: >>> > > > >> >>>>>>>>> >>> > > > >> >>>>>>>>>> This is a fair summary of the current state but also >>> > where >>> > > > beam >>> > > > >> >>> >>> > > > >> >>> can >>> > > > >> >>>>>>>> >>> > > > >> >>>>>>>> have a >>> > > > >> >>>>>>>>>> >>> > > > >> >>>>>>>>>> very strong added value and make big data great and >>> > smooth. >>> > > > >> >>>>>>>>>> >>> > > > >> >>>>>>>>>> Instead of this replay feature isnt checkpointing >>> > willable? >>> > > > In >>> > > > >> >>>>>>>> >>> > > > >> >>>>>>>> particular >>> > > > >> >>>>>>>>>> >>> > > > >> >>>>>>>>>> with SDF no? >>> > > > >> >>>>>>>>>> >>> > > > >> >>>>>>>>>> >>> > > > >> >>>>>>>>>> Le 16 nov. 2017 19:50, "Raghu Angadi" >>> > > > >> <[email protected]> >>> > > > >> >>> >>> > > > >> >>> a >>> > > > >> >>>>>>>>>> >>> > > > >> >>>>>>>>>> écrit : >>> > > > >> >>>>>>>>>> >>> > > > >> >>>>>>>>>>> Core issue here is that there is no explicit concept >>> of >>> > > > >> >>> >>> > > > >> >>> 'checkpoint' >>> > > > >> >>>>>>>> >>> > > > >> >>>>>>>> in >>> > > > >> >>>>>>>>>>> >>> > > > >> >>>>>>>>>>> Beam (UnboundedSource has a method 'getCheckpointMark' >>> > but >>> > > > that >>> > > > >> >>>>>>>> >>> > > > >> >>>>>>>> refers to >>> > > > >> >>>>>>>>>>> >>> > > > >> >>>>>>>>>>> the checkoint on external source). Runners do >>> checkpoint >>> > > > >> >>> >>> > > > >> >>> internally >>> > > > >> >>>>> >>> > > > >> >>>>> as >>> > > > >> >>>>>>>>>>> >>> > > > >> >>>>>>>>>>> implementation detail. Flink's checkpoint model is >>> > > entirely >>> > > > >> >>>>> >>> > > > >> >>>>> different >>> > > > >> >>>>>>>>>> >>> > > > >> >>>>>>>>>> from >>> > > > >> >>>>>>>>>>> >>> > > > >> >>>>>>>>>>> Dataflow's and Spark's. >>> > > > >> >>>>>>>>>>> >>> > > > >> >>>>>>>>>>> @StableReplay helps, but it does not explicitly talk >>> > about >>> > > a >>> > > > >> >>>>>>>> >>> > > > >> >>>>>>>> checkpoint >>> > > > >> >>>>>>>>>> >>> > > > >> >>>>>>>>>> by >>> > > > >> >>>>>>>>>>> >>> > > > >> >>>>>>>>>>> design. >>> > > > >> >>>>>>>>>>> >>> > > > >> >>>>>>>>>>> If you are looking to achieve some guarantees with a >>> > > > >> sink/DoFn, I >>> > > > >> >>>>>>>> >>> > > > >> >>>>>>>> think >>> > > > >> >>>>>>>>>> >>> > > > >> >>>>>>>>>> it >>> > > > >> >>>>>>>>>>> >>> > > > >> >>>>>>>>>>> is better to start with the requirements. I worked on >>> > > > >> >>> >>> > > > >> >>> exactly-once >>> > > > >> >>>>>>>> >>> > > > >> >>>>>>>> sink >>> > > > >> >>>>>>>>>> >>> > > > >> >>>>>>>>>> for >>> > > > >> >>>>>>>>>>> >>> > > > >> >>>>>>>>>>> Kafka (see KafkaIO.write().withEOS()), where we >>> > > essentially >>> > > > >> >>> >>> > > > >> >>> reshard >>> > > > >> >>>>>>>> >>> > > > >> >>>>>>>> the >>> > > > >> >>>>>>>>>>> >>> > > > >> >>>>>>>>>>> elements and assign sequence numbers to elements with >>> in >>> > > > each >>> > > > >> >>> >>> > > > >> >>> shard. >>> > > > >> >>>>>>>>>>> >>> > > > >> >>>>>>>>>>> Duplicates in replays are avoided based on these >>> > sequence >>> > > > >> >>> >>> > > > >> >>> numbers. >>> > > > >> >>>>>>>> >>> > > > >> >>>>>>>> DoFn >>> > > > >> >>>>>>>>>>> >>> > > > >> >>>>>>>>>>> state API is used to buffer out-of order replays. The >>> > > > >> >>> >>> > > > >> >>> implementation >>> > > > >> >>>>>>>>>>> >>> > > > >> >>>>>>>>>>> strategy works in Dataflow but not in Flink which has >>> a >>> > > > >> >>> >>> > > > >> >>> horizontal >>> > > > >> >>>>>>>>>>> >>> > > > >> >>>>>>>>>>> checkpoint. KafkaIO checks for compatibility. >>> > > > >> >>>>>>>>>>> >>> > > > >> >>>>>>>>>>> On Wed, Nov 15, 2017 at 12:38 AM, Romain Manni-Bucau < >>> > > > >> >>>>>>>>>>> [email protected]> >>> > > > >> >>>>>>>>>>> wrote: >>> > > > >> >>>>>>>>>>> >>> > > > >> >>>>>>>>>>>> Hi guys, >>> > > > >> >>>>>>>>>>>> >>> > > > >> >>>>>>>>>>>> The subject is a bit provocative but the topic is >>> real >>> > > and >>> > > > >> >>> >>> > > > >> >>> coming >>> > > > >> >>>>>>>>>>>> >>> > > > >> >>>>>>>>>>>> again and again with the beam usage: how a dofn can >>> > > handle >>> > > > >> some >>> > > > >> >>>>>>>>>>>> "chunking". >>> > > > >> >>>>>>>>>>>> >>> > > > >> >>>>>>>>>>>> The need is to be able to commit each N records but >>> > with >>> > > N >>> > > > not >>> > > > >> >>> >>> > > > >> >>> too >>> > > > >> >>>>>>>> >>> > > > >> >>>>>>>> big. >>> > > > >> >>>>>>>>>>>> >>> > > > >> >>>>>>>>>>>> >>> > > > >> >>>>>>>>>>>> The natural API for that in beam is the bundle one >>> but >>> > > > bundles >>> > > > >> >>> >>> > > > >> >>> are >>> > > > >> >>>>>>>> >>> > > > >> >>>>>>>> not >>> > > > >> >>>>>>>>>>>> >>> > > > >> >>>>>>>>>>>> reliable since they can be very small (flink) - we >>> can >>> > > say >>> > > > it >>> > > > >> is >>> > > > >> >>>>>>>> >>> > > > >> >>>>>>>> "ok" >>> > > > >> >>>>>>>>>>>> >>> > > > >> >>>>>>>>>>>> even if it has some perf impacts - or too big (spark >>> > does >>> > > > full >>> > > > >> >>> >>> > > > >> >>> size >>> > > > >> >>>>>>>> >>> > > > >> >>>>>>>> / >>> > > > >> >>>>>>>>>>>> >>> > > > >> >>>>>>>>>>>> #workers). >>> > > > >> >>>>>>>>>>>> >>> > > > >> >>>>>>>>>>>> The workaround is what we see in the ES I/O: a >>> maxSize >>> > > > which >>> > > > >> >>> >>> > > > >> >>> does >>> > > > >> >>>>> >>> > > > >> >>>>> an >>> > > > >> >>>>>>>>>>>> >>> > > > >> >>>>>>>>>>>> eager flush. The issue is that then the checkpoint is >>> > not >>> > > > >> >>> >>> > > > >> >>> respected >>> > > > >> >>>>>>>>>>>> >>> > > > >> >>>>>>>>>>>> and you can process multiple times the same records. >>> > > > >> >>>>>>>>>>>> >>> > > > >> >>>>>>>>>>>> Any plan to make this API reliable and controllable >>> > from >>> > > a >>> > > > >> beam >>> > > > >> >>>>>>>> >>> > > > >> >>>>>>>> point >>> > > > >> >>>>>>>>>>>> >>> > > > >> >>>>>>>>>>>> of view (at least in a max manner)? >>> > > > >> >>>>>>>>>>>> >>> > > > >> >>>>>>>>>>>> Thanks, >>> > > > >> >>>>>>>>>>>> Romain Manni-Bucau >>> > > > >> >>>>>>>>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn >>> > > > >> >>>>>>>>>>>> >>> > > > >> >>>>>>>>>>> >>> > > > >> >>>>>>>>>> >>> > > > >> >>>>>>>> >>> > > > >> >>>>> >>> > > > >> >>>>> -- >>> > > > >> >>>>> Jean-Baptiste Onofré >>> > > > >> >>>>> [email protected] >>> > > > >> >>>>> http://blog.nanthrax.net >>> > > > >> >>>>> Talend - http://www.talend.com >>> > > > >> >>>>> >>> > > > >> >>> >>> > > > >> >> >>> > > > >> > >>> > > > >> > -- >>> > > > >> > Jean-Baptiste Onofré >>> > > > >> > [email protected] >>> > > > >> > http://blog.nanthrax.net >>> > > > >> > Talend - http://www.talend.com >>> > > > >> >>> > > > >>> > > >>> > >>>
