Yep, just take ES IO, if a part of a bundle fails you are in an unmanaged state. This is the case for all O (of IO ;)). Issue is not much about "1" (the code it takes) but more the fact it doesn't integrate with runner features and retries potentially: what happens if a bundle has a failure? => undefined today. 2. I'm fine with it while we know exactly what happens when we restart after a bundle failure. With ES the timestamp can be used for instance.
Thinking about it the SDF is really a way to do it since the SDF will manage the bulking and associated with the runner "retry" it seems it covers the needs. Romain Manni-Bucau @rmannibucau | Blog | Old Blog | Github | LinkedIn 2017-11-17 9:23 GMT+01:00 Eugene Kirpichov <kirpic...@google.com.invalid>: > I must admit I'm still failing to understand the problem, so let's step > back even further. > > Could you give an example of an IO that is currently difficult to implement > specifically because of lack of the feature you're talking about? > > I'm asking because I've reviewed almost all Beam IOs and don't recall > seeing a similar problem. Sure, a lot of IOs do batching within a bundle, > but 1) it doesn't take up much code (granted, it would be even easier if > Beam did it for us) and 2) I don't remember any of them requiring the > batches to be deterministic, and I'm having a hard time imagining what kind > of storage system would be able to deduplicate if batches were > deterministic but wouldn't be able to deduplicate if they weren't. > > On Thu, Nov 16, 2017 at 11:50 PM Romain Manni-Bucau <rmannibu...@gmail.com> > wrote: > >> Ok, let me try to step back and summarize what we have today and what I >> miss: >> >> 1. we can handle chunking in beam through group in batch (or equivalent) >> but: >> > it is not built-in into the transforms (IO) and it is controlled >> from outside the transforms so no way for a transform to do it >> properly without handling itself a composition and links between >> multiple dofns to have notifications and potentially react properly or >> handle backpressure from its backend >> 2. there is no restart feature because there is no real state handling >> at the moment. this sounds fully delegated to the runner but I was >> hoping to have more guarantees from the used API to be able to restart >> a pipeline (mainly batch since it can be irrelevant or delegates to >> the backend for streams) and handle only not commited records so it >> requires some persistence outside the main IO storages to do it >> properly >> > note this is somehow similar to the monitoring topic which miss >> persistence ATM so it can end up to beam to have a pluggable storage >> for a few concerns >> >> >> Short term I would be happy with 1 solved properly, long term I hope 2 >> will be tackled without workarounds requiring custom wrapping of IO to >> use a custom state persistence. >> >> >> >> Romain Manni-Bucau >> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >> >> 2017-11-17 7:44 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net>: >> > Thanks for the explanation. Agree, we might talk about different things >> > using the same wording. >> > >> > I'm also struggling to understand the use case (for a generic DoFn). >> > >> > Regards >> > JB >> > >> > >> > On 11/17/2017 07:40 AM, Eugene Kirpichov wrote: >> >> >> >> To avoid spending a lot of time pursuing a false path, I'd like to say >> >> straight up that SDF is definitely not going to help here, despite the >> >> fact >> >> that its API includes the term "checkpoint". In SDF, the "checkpoint" >> >> captures the state of processing within a single element. If you're >> >> applying an SDF to 1000 elements, it will, like any other DoFn, be >> applied >> >> to each of them independently and in parallel, and you'll have 1000 >> >> checkpoints capturing the state of processing each of these elements, >> >> which >> >> is probably not what you want. >> >> >> >> I'm afraid I still don't understand what kind of checkpoint you need, if >> >> it >> >> is not just deterministic grouping into batches. "Checkpoint" is a very >> >> broad term and it's very possible that everybody in this thread is >> talking >> >> about different things when saying it. So it would help if you could >> give >> >> a >> >> more concrete example: for example, take some IO that you think could be >> >> easier to write with your proposed API, give the contents of a >> >> hypothetical >> >> PCollection being written to this IO, give the code of a hypothetical >> DoFn >> >> implementing the write using your API, and explain what you'd expect to >> >> happen at runtime. >> >> >> >> On Thu, Nov 16, 2017 at 10:33 PM Romain Manni-Bucau >> >> <rmannibu...@gmail.com> >> >> wrote: >> >> >> >>> @Eugene: yes and the other alternative of Reuven too but it is still >> >>> 1. relying on timers, 2. not really checkpointed >> >>> >> >>> In other words it seems all solutions are to create a chunk of size 1 >> >>> and replayable to fake the lack of chunking in the framework. This >> >>> always implies a chunk handling outside the component (typically >> >>> before for an output). My point is I think IO need it in their own >> >>> "internal" or at least control it themselves since the chunk size is >> >>> part of the IO handling most of the time. >> >>> >> >>> I think JB spoke of the same "group before" trick using restrictions >> >>> which can work I have to admit if SDF are implemented by runners. Is >> >>> there a roadmap/status on that? Last time I checked SDF was a great >> >>> API without support :(. >> >>> >> >>> >> >>> >> >>> Romain Manni-Bucau >> >>> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >>> >> >>> >> >>> 2017-11-17 7:25 GMT+01:00 Eugene Kirpichov >> >>> <kirpic...@google.com.invalid>: >> >>>> >> >>>> JB, not sure what you mean? SDFs and triggers are unrelated, and the >> >>>> post >> >>>> doesn't mention the word. Did you mean something else, e.g. >> restriction >> >>>> perhaps? Either way I don't think SDFs are the solution here; SDFs >> have >> >>> >> >>> to >> >>>> >> >>>> do with the ability to split the processing of *a single element* over >> >>>> multiple calls, whereas Romain I think is asking for repeatable >> grouping >> >>> >> >>> of >> >>>> >> >>>> *multiple* elements. >> >>>> >> >>>> Romain - does >> >>>> >> >>> >> >>> >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java >> >>>> >> >>>> do what >> >>>> you want? >> >>>> >> >>>> On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste Onofré < >> j...@nanthrax.net> >> >>>> wrote: >> >>>> >> >>>>> It sounds like the "Trigger" in the Splittable DoFn, no ? >> >>>>> >> >>>>> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html >> >>>>> >> >>>>> Regards >> >>>>> JB >> >>>>> >> >>>>> >> >>>>> On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote: >> >>>>>> >> >>>>>> it gives the fn/transform the ability to save a state - it can get >> >>>>>> back on "restart" / whatever unit we can use, probably runner >> >>>>>> dependent? Without that you need to rewrite all IO usage with >> >>>>>> something like the previous pattern which makes the IO not self >> >>>>>> sufficient and kind of makes the entry cost and usage of beam way >> >>>>>> further. >> >>>>>> >> >>>>>> In my mind it is exactly what jbatch/spring-batch uses but adapted >> to >> >>>>>> beam (stream in particular) case. >> >>>>>> >> >>>>>> Romain Manni-Bucau >> >>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >>>>>> >> >>>>>> >> >>>>>> 2017-11-17 6:49 GMT+01:00 Reuven Lax <re...@google.com.invalid>: >> >>>>>>> >> >>>>>>> Romain, >> >>>>>>> >> >>>>>>> Can you define what you mean by checkpoint? What are the semantics, >> >>> >> >>> what >> >>>>>>> >> >>>>>>> does it accomplish? >> >>>>>>> >> >>>>>>> Reuven >> >>>>>>> >> >>>>>>> On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau < >> >>>>> >> >>>>> rmannibu...@gmail.com> >> >>>>>>> >> >>>>>>> wrote: >> >>>>>>> >> >>>>>>>> Yes, what I propose earlier was: >> >>>>>>>> >> >>>>>>>> I. checkpoint marker: >> >>>>>>>> >> >>>>>>>> @AnyBeamAnnotation >> >>>>>>>> @CheckpointAfter >> >>>>>>>> public void someHook(SomeContext ctx); >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> II. pipeline.apply(ParDo.of(new >> MyFn()).withCheckpointAlgorithm(new >> >>>>>>>> CountingAlgo())) >> >>>>>>>> >> >>>>>>>> III. (I like this one less) >> >>>>>>>> >> >>>>>>>> // in the dofn >> >>>>>>>> @CheckpointTester >> >>>>>>>> public boolean shouldCheckpoint(); >> >>>>>>>> >> >>>>>>>> IV. @Checkpointer Serializable getCheckpoint(); in the dofn per >> >>> >> >>> element >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> Romain Manni-Bucau >> >>>>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> 2017-11-17 6:06 GMT+01:00 Raghu Angadi <rang...@google.com.invalid >> >>>> >> >>>> : >> >>>>>>>>> >> >>>>>>>>> How would you define it (rough API is fine)?. Without more >> details, >> >>>>> >> >>>>> it is >> >>>>>>>>> >> >>>>>>>>> not easy to see wider applicability and feasibility in runners. >> >>>>>>>>> >> >>>>>>>>> On Thu, Nov 16, 2017 at 1:13 PM, Romain Manni-Bucau < >> >>>>>>>> >> >>>>>>>> rmannibu...@gmail.com> >> >>>>>>>>> >> >>>>>>>>> wrote: >> >>>>>>>>> >> >>>>>>>>>> This is a fair summary of the current state but also where beam >> >>> >> >>> can >> >>>>>>>> >> >>>>>>>> have a >> >>>>>>>>>> >> >>>>>>>>>> very strong added value and make big data great and smooth. >> >>>>>>>>>> >> >>>>>>>>>> Instead of this replay feature isnt checkpointing willable? In >> >>>>>>>> >> >>>>>>>> particular >> >>>>>>>>>> >> >>>>>>>>>> with SDF no? >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> Le 16 nov. 2017 19:50, "Raghu Angadi" >> <rang...@google.com.invalid> >> >>> >> >>> a >> >>>>>>>>>> >> >>>>>>>>>> écrit : >> >>>>>>>>>> >> >>>>>>>>>>> Core issue here is that there is no explicit concept of >> >>> >> >>> 'checkpoint' >> >>>>>>>> >> >>>>>>>> in >> >>>>>>>>>>> >> >>>>>>>>>>> Beam (UnboundedSource has a method 'getCheckpointMark' but that >> >>>>>>>> >> >>>>>>>> refers to >> >>>>>>>>>>> >> >>>>>>>>>>> the checkoint on external source). Runners do checkpoint >> >>> >> >>> internally >> >>>>> >> >>>>> as >> >>>>>>>>>>> >> >>>>>>>>>>> implementation detail. Flink's checkpoint model is entirely >> >>>>> >> >>>>> different >> >>>>>>>>>> >> >>>>>>>>>> from >> >>>>>>>>>>> >> >>>>>>>>>>> Dataflow's and Spark's. >> >>>>>>>>>>> >> >>>>>>>>>>> @StableReplay helps, but it does not explicitly talk about a >> >>>>>>>> >> >>>>>>>> checkpoint >> >>>>>>>>>> >> >>>>>>>>>> by >> >>>>>>>>>>> >> >>>>>>>>>>> design. >> >>>>>>>>>>> >> >>>>>>>>>>> If you are looking to achieve some guarantees with a >> sink/DoFn, I >> >>>>>>>> >> >>>>>>>> think >> >>>>>>>>>> >> >>>>>>>>>> it >> >>>>>>>>>>> >> >>>>>>>>>>> is better to start with the requirements. I worked on >> >>> >> >>> exactly-once >> >>>>>>>> >> >>>>>>>> sink >> >>>>>>>>>> >> >>>>>>>>>> for >> >>>>>>>>>>> >> >>>>>>>>>>> Kafka (see KafkaIO.write().withEOS()), where we essentially >> >>> >> >>> reshard >> >>>>>>>> >> >>>>>>>> the >> >>>>>>>>>>> >> >>>>>>>>>>> elements and assign sequence numbers to elements with in each >> >>> >> >>> shard. >> >>>>>>>>>>> >> >>>>>>>>>>> Duplicates in replays are avoided based on these sequence >> >>> >> >>> numbers. >> >>>>>>>> >> >>>>>>>> DoFn >> >>>>>>>>>>> >> >>>>>>>>>>> state API is used to buffer out-of order replays. The >> >>> >> >>> implementation >> >>>>>>>>>>> >> >>>>>>>>>>> strategy works in Dataflow but not in Flink which has a >> >>> >> >>> horizontal >> >>>>>>>>>>> >> >>>>>>>>>>> checkpoint. KafkaIO checks for compatibility. >> >>>>>>>>>>> >> >>>>>>>>>>> On Wed, Nov 15, 2017 at 12:38 AM, Romain Manni-Bucau < >> >>>>>>>>>>> rmannibu...@gmail.com> >> >>>>>>>>>>> wrote: >> >>>>>>>>>>> >> >>>>>>>>>>>> Hi guys, >> >>>>>>>>>>>> >> >>>>>>>>>>>> The subject is a bit provocative but the topic is real and >> >>> >> >>> coming >> >>>>>>>>>>>> >> >>>>>>>>>>>> again and again with the beam usage: how a dofn can handle >> some >> >>>>>>>>>>>> "chunking". >> >>>>>>>>>>>> >> >>>>>>>>>>>> The need is to be able to commit each N records but with N not >> >>> >> >>> too >> >>>>>>>> >> >>>>>>>> big. >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> The natural API for that in beam is the bundle one but bundles >> >>> >> >>> are >> >>>>>>>> >> >>>>>>>> not >> >>>>>>>>>>>> >> >>>>>>>>>>>> reliable since they can be very small (flink) - we can say it >> is >> >>>>>>>> >> >>>>>>>> "ok" >> >>>>>>>>>>>> >> >>>>>>>>>>>> even if it has some perf impacts - or too big (spark does full >> >>> >> >>> size >> >>>>>>>> >> >>>>>>>> / >> >>>>>>>>>>>> >> >>>>>>>>>>>> #workers). >> >>>>>>>>>>>> >> >>>>>>>>>>>> The workaround is what we see in the ES I/O: a maxSize which >> >>> >> >>> does >> >>>>> >> >>>>> an >> >>>>>>>>>>>> >> >>>>>>>>>>>> eager flush. The issue is that then the checkpoint is not >> >>> >> >>> respected >> >>>>>>>>>>>> >> >>>>>>>>>>>> and you can process multiple times the same records. >> >>>>>>>>>>>> >> >>>>>>>>>>>> Any plan to make this API reliable and controllable from a >> beam >> >>>>>>>> >> >>>>>>>> point >> >>>>>>>>>>>> >> >>>>>>>>>>>> of view (at least in a max manner)? >> >>>>>>>>>>>> >> >>>>>>>>>>>> Thanks, >> >>>>>>>>>>>> Romain Manni-Bucau >> >>>>>>>>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> >>>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>> >> >>>>> >> >>>>> -- >> >>>>> Jean-Baptiste Onofré >> >>>>> jbono...@apache.org >> >>>>> http://blog.nanthrax.net >> >>>>> Talend - http://www.talend.com >> >>>>> >> >>> >> >> >> > >> > -- >> > Jean-Baptiste Onofré >> > jbono...@apache.org >> > http://blog.nanthrax.net >> > Talend - http://www.talend.com >>