Yep, just take ES IO, if a part of a bundle fails you are in an
unmanaged state. This is the case for all O (of IO ;)). Issue is not
much about "1" (the code it takes) but more the fact it doesn't
integrate with runner features and retries potentially: what happens
if a bundle has a failure? => undefined today. 2. I'm fine with it
while we know exactly what happens when we restart after a bundle
failure. With ES the timestamp can be used for instance.

Thinking about it the SDF is really a way to do it since the SDF will
manage the bulking and associated with the runner "retry" it seems it
covers the needs.

Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-17 9:23 GMT+01:00 Eugene Kirpichov <kirpic...@google.com.invalid>:
> I must admit I'm still failing to understand the problem, so let's step
> back even further.
>
> Could you give an example of an IO that is currently difficult to implement
> specifically because of lack of the feature you're talking about?
>
> I'm asking because I've reviewed almost all Beam IOs and don't recall
> seeing a similar problem. Sure, a lot of IOs do batching within a bundle,
> but 1) it doesn't take up much code (granted, it would be even easier if
> Beam did it for us) and 2) I don't remember any of them requiring the
> batches to be deterministic, and I'm having a hard time imagining what kind
> of storage system would be able to deduplicate if batches were
> deterministic but wouldn't be able to deduplicate if they weren't.
>
> On Thu, Nov 16, 2017 at 11:50 PM Romain Manni-Bucau <rmannibu...@gmail.com>
> wrote:
>
>> Ok, let me try to step back and summarize what we have today and what I
>> miss:
>>
>> 1. we can handle chunking in beam through group in batch (or equivalent)
>> but:
>>    > it is not built-in into the transforms (IO) and it is controlled
>> from outside the transforms so no way for a transform to do it
>> properly without handling itself a composition and links between
>> multiple dofns to have notifications and potentially react properly or
>> handle backpressure from its backend
>> 2. there is no restart feature because there is no real state handling
>> at the moment. this sounds fully delegated to the runner but I was
>> hoping to have more guarantees from the used API to be able to restart
>> a pipeline (mainly batch since it can be irrelevant or delegates to
>> the backend for streams) and handle only not commited records so it
>> requires some persistence outside the main IO storages to do it
>> properly
>>    > note this is somehow similar to the monitoring topic which miss
>> persistence ATM so it can end up to beam to have a pluggable storage
>> for a few concerns
>>
>>
>> Short term I would be happy with 1 solved properly, long term I hope 2
>> will be tackled without workarounds requiring custom wrapping of IO to
>> use a custom state persistence.
>>
>>
>>
>> Romain Manni-Bucau
>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>
>>
>> 2017-11-17 7:44 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net>:
>> > Thanks for the explanation. Agree, we might talk about different things
>> > using the same wording.
>> >
>> > I'm also struggling to understand the use case (for a generic DoFn).
>> >
>> > Regards
>> > JB
>> >
>> >
>> > On 11/17/2017 07:40 AM, Eugene Kirpichov wrote:
>> >>
>> >> To avoid spending a lot of time pursuing a false path, I'd like to say
>> >> straight up that SDF is definitely not going to help here, despite the
>> >> fact
>> >> that its API includes the term "checkpoint". In SDF, the "checkpoint"
>> >> captures the state of processing within a single element. If you're
>> >> applying an SDF to 1000 elements, it will, like any other DoFn, be
>> applied
>> >> to each of them independently and in parallel, and you'll have 1000
>> >> checkpoints capturing the state of processing each of these elements,
>> >> which
>> >> is probably not what you want.
>> >>
>> >> I'm afraid I still don't understand what kind of checkpoint you need, if
>> >> it
>> >> is not just deterministic grouping into batches. "Checkpoint" is a very
>> >> broad term and it's very possible that everybody in this thread is
>> talking
>> >> about different things when saying it. So it would help if you could
>> give
>> >> a
>> >> more concrete example: for example, take some IO that you think could be
>> >> easier to write with your proposed API, give the contents of a
>> >> hypothetical
>> >> PCollection being written to this IO, give the code of a hypothetical
>> DoFn
>> >> implementing the write using your API, and explain what you'd expect to
>> >> happen at runtime.
>> >>
>> >> On Thu, Nov 16, 2017 at 10:33 PM Romain Manni-Bucau
>> >> <rmannibu...@gmail.com>
>> >> wrote:
>> >>
>> >>> @Eugene: yes and the other alternative of Reuven too but it is still
>> >>> 1. relying on timers, 2. not really checkpointed
>> >>>
>> >>> In other words it seems all solutions are to create a chunk of size 1
>> >>> and replayable to fake the lack of chunking in the framework. This
>> >>> always implies a chunk handling outside the component (typically
>> >>> before for an output). My point is I think IO need it in their own
>> >>> "internal" or at least control it themselves since the chunk size is
>> >>> part of the IO handling most of the time.
>> >>>
>> >>> I think JB spoke of the same "group before" trick using restrictions
>> >>> which can work I have to admit if SDF are implemented by runners. Is
>> >>> there a roadmap/status on that? Last time I checked SDF was a great
>> >>> API without support :(.
>> >>>
>> >>>
>> >>>
>> >>> Romain Manni-Bucau
>> >>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >>>
>> >>>
>> >>> 2017-11-17 7:25 GMT+01:00 Eugene Kirpichov
>> >>> <kirpic...@google.com.invalid>:
>> >>>>
>> >>>> JB, not sure what you mean? SDFs and triggers are unrelated, and the
>> >>>> post
>> >>>> doesn't mention the word. Did you mean something else, e.g.
>> restriction
>> >>>> perhaps? Either way I don't think SDFs are the solution here; SDFs
>> have
>> >>>
>> >>> to
>> >>>>
>> >>>> do with the ability to split the processing of *a single element* over
>> >>>> multiple calls, whereas Romain I think is asking for repeatable
>> grouping
>> >>>
>> >>> of
>> >>>>
>> >>>> *multiple* elements.
>> >>>>
>> >>>> Romain - does
>> >>>>
>> >>>
>> >>>
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
>> >>>>
>> >>>> do what
>> >>>> you want?
>> >>>>
>> >>>> On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste Onofré <
>> j...@nanthrax.net>
>> >>>> wrote:
>> >>>>
>> >>>>> It sounds like the "Trigger" in the Splittable DoFn, no ?
>> >>>>>
>> >>>>> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
>> >>>>>
>> >>>>> Regards
>> >>>>> JB
>> >>>>>
>> >>>>>
>> >>>>> On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote:
>> >>>>>>
>> >>>>>> it gives the fn/transform the ability to save a state - it can get
>> >>>>>> back on "restart" / whatever unit we can use, probably runner
>> >>>>>> dependent? Without that you need to rewrite all IO usage with
>> >>>>>> something like the previous pattern which makes the IO not self
>> >>>>>> sufficient and kind of makes the entry cost and usage of beam way
>> >>>>>> further.
>> >>>>>>
>> >>>>>> In my mind it is exactly what jbatch/spring-batch uses but adapted
>> to
>> >>>>>> beam (stream in particular) case.
>> >>>>>>
>> >>>>>> Romain Manni-Bucau
>> >>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >>>>>>
>> >>>>>>
>> >>>>>> 2017-11-17 6:49 GMT+01:00 Reuven Lax <re...@google.com.invalid>:
>> >>>>>>>
>> >>>>>>> Romain,
>> >>>>>>>
>> >>>>>>> Can you define what you mean by checkpoint? What are the semantics,
>> >>>
>> >>> what
>> >>>>>>>
>> >>>>>>> does it accomplish?
>> >>>>>>>
>> >>>>>>> Reuven
>> >>>>>>>
>> >>>>>>> On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau <
>> >>>>>
>> >>>>> rmannibu...@gmail.com>
>> >>>>>>>
>> >>>>>>> wrote:
>> >>>>>>>
>> >>>>>>>> Yes, what I propose earlier was:
>> >>>>>>>>
>> >>>>>>>> I. checkpoint marker:
>> >>>>>>>>
>> >>>>>>>> @AnyBeamAnnotation
>> >>>>>>>> @CheckpointAfter
>> >>>>>>>> public void someHook(SomeContext ctx);
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> II. pipeline.apply(ParDo.of(new
>> MyFn()).withCheckpointAlgorithm(new
>> >>>>>>>> CountingAlgo()))
>> >>>>>>>>
>> >>>>>>>> III. (I like this one less)
>> >>>>>>>>
>> >>>>>>>> // in the dofn
>> >>>>>>>> @CheckpointTester
>> >>>>>>>> public boolean shouldCheckpoint();
>> >>>>>>>>
>> >>>>>>>> IV. @Checkpointer Serializable getCheckpoint(); in the dofn per
>> >>>
>> >>> element
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> Romain Manni-Bucau
>> >>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> 2017-11-17 6:06 GMT+01:00 Raghu Angadi <rang...@google.com.invalid
>> >>>>
>> >>>> :
>> >>>>>>>>>
>> >>>>>>>>> How would you define it (rough API is fine)?. Without more
>> details,
>> >>>>>
>> >>>>> it is
>> >>>>>>>>>
>> >>>>>>>>> not easy to see wider applicability and feasibility in runners.
>> >>>>>>>>>
>> >>>>>>>>> On Thu, Nov 16, 2017 at 1:13 PM, Romain Manni-Bucau <
>> >>>>>>>>
>> >>>>>>>> rmannibu...@gmail.com>
>> >>>>>>>>>
>> >>>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>>> This is a fair summary of the current state but also where beam
>> >>>
>> >>> can
>> >>>>>>>>
>> >>>>>>>> have a
>> >>>>>>>>>>
>> >>>>>>>>>> very strong added value and make big data great and smooth.
>> >>>>>>>>>>
>> >>>>>>>>>> Instead of this replay feature isnt checkpointing willable? In
>> >>>>>>>>
>> >>>>>>>> particular
>> >>>>>>>>>>
>> >>>>>>>>>> with SDF no?
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> Le 16 nov. 2017 19:50, "Raghu Angadi"
>> <rang...@google.com.invalid>
>> >>>
>> >>> a
>> >>>>>>>>>>
>> >>>>>>>>>> écrit :
>> >>>>>>>>>>
>> >>>>>>>>>>> Core issue here is that there is no explicit concept of
>> >>>
>> >>> 'checkpoint'
>> >>>>>>>>
>> >>>>>>>> in
>> >>>>>>>>>>>
>> >>>>>>>>>>> Beam (UnboundedSource has a method 'getCheckpointMark' but that
>> >>>>>>>>
>> >>>>>>>> refers to
>> >>>>>>>>>>>
>> >>>>>>>>>>> the checkoint on external source). Runners do checkpoint
>> >>>
>> >>> internally
>> >>>>>
>> >>>>> as
>> >>>>>>>>>>>
>> >>>>>>>>>>> implementation detail. Flink's checkpoint model is entirely
>> >>>>>
>> >>>>> different
>> >>>>>>>>>>
>> >>>>>>>>>> from
>> >>>>>>>>>>>
>> >>>>>>>>>>> Dataflow's and Spark's.
>> >>>>>>>>>>>
>> >>>>>>>>>>> @StableReplay helps, but it does not explicitly talk about a
>> >>>>>>>>
>> >>>>>>>> checkpoint
>> >>>>>>>>>>
>> >>>>>>>>>> by
>> >>>>>>>>>>>
>> >>>>>>>>>>> design.
>> >>>>>>>>>>>
>> >>>>>>>>>>> If you are looking to achieve some guarantees with a
>> sink/DoFn, I
>> >>>>>>>>
>> >>>>>>>> think
>> >>>>>>>>>>
>> >>>>>>>>>> it
>> >>>>>>>>>>>
>> >>>>>>>>>>> is better to start with the requirements. I worked on
>> >>>
>> >>> exactly-once
>> >>>>>>>>
>> >>>>>>>> sink
>> >>>>>>>>>>
>> >>>>>>>>>> for
>> >>>>>>>>>>>
>> >>>>>>>>>>> Kafka (see KafkaIO.write().withEOS()), where we essentially
>> >>>
>> >>> reshard
>> >>>>>>>>
>> >>>>>>>> the
>> >>>>>>>>>>>
>> >>>>>>>>>>> elements and assign sequence numbers to elements with in each
>> >>>
>> >>> shard.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Duplicates in replays are avoided based on these sequence
>> >>>
>> >>> numbers.
>> >>>>>>>>
>> >>>>>>>> DoFn
>> >>>>>>>>>>>
>> >>>>>>>>>>> state API is used to buffer out-of order replays. The
>> >>>
>> >>> implementation
>> >>>>>>>>>>>
>> >>>>>>>>>>> strategy works in Dataflow but not in Flink which has a
>> >>>
>> >>> horizontal
>> >>>>>>>>>>>
>> >>>>>>>>>>> checkpoint. KafkaIO checks for compatibility.
>> >>>>>>>>>>>
>> >>>>>>>>>>> On Wed, Nov 15, 2017 at 12:38 AM, Romain Manni-Bucau <
>> >>>>>>>>>>> rmannibu...@gmail.com>
>> >>>>>>>>>>> wrote:
>> >>>>>>>>>>>
>> >>>>>>>>>>>> Hi guys,
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> The subject is a bit provocative but the topic is real and
>> >>>
>> >>> coming
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> again and again with the beam usage: how a dofn can handle
>> some
>> >>>>>>>>>>>> "chunking".
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> The need is to be able to commit each N records but with N not
>> >>>
>> >>> too
>> >>>>>>>>
>> >>>>>>>> big.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> The natural API for that in beam is the bundle one but bundles
>> >>>
>> >>> are
>> >>>>>>>>
>> >>>>>>>> not
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> reliable since they can be very small (flink) - we can say it
>> is
>> >>>>>>>>
>> >>>>>>>> "ok"
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> even if it has some perf impacts - or too big (spark does full
>> >>>
>> >>> size
>> >>>>>>>>
>> >>>>>>>> /
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> #workers).
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> The workaround is what we see in the ES I/O: a maxSize which
>> >>>
>> >>> does
>> >>>>>
>> >>>>> an
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> eager flush. The issue is that then the checkpoint is not
>> >>>
>> >>> respected
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> and you can process multiple times the same records.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Any plan to make this API reliable and controllable from a
>> beam
>> >>>>>>>>
>> >>>>>>>> point
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> of view (at least in a max manner)?
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Thanks,
>> >>>>>>>>>>>> Romain Manni-Bucau
>> >>>>>>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >>>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>
>> >>>>>
>> >>>>> --
>> >>>>> Jean-Baptiste Onofré
>> >>>>> jbono...@apache.org
>> >>>>> http://blog.nanthrax.net
>> >>>>> Talend - http://www.talend.com
>> >>>>>
>> >>>
>> >>
>> >
>> > --
>> > Jean-Baptiste Onofré
>> > jbono...@apache.org
>> > http://blog.nanthrax.net
>> > Talend - http://www.talend.com
>>

Reply via email to