@Eugene: yes and the other alternative of Reuven too but it is still
1. relying on timers, 2. not really checkpointed

In other words it seems all solutions are to create a chunk of size 1
and replayable to fake the lack of chunking in the framework. This
always implies a chunk handling outside the component (typically
before for an output). My point is I think IO need it in their own
"internal" or at least control it themselves since the chunk size is
part of the IO handling most of the time.

I think JB spoke of the same "group before" trick using restrictions
which can work I have to admit if SDF are implemented by runners. Is
there a roadmap/status on that? Last time I checked SDF was a great
API without support :(.



Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-17 7:25 GMT+01:00 Eugene Kirpichov <[email protected]>:
> JB, not sure what you mean? SDFs and triggers are unrelated, and the post
> doesn't mention the word. Did you mean something else, e.g. restriction
> perhaps? Either way I don't think SDFs are the solution here; SDFs have to
> do with the ability to split the processing of *a single element* over
> multiple calls, whereas Romain I think is asking for repeatable grouping of
> *multiple* elements.
>
> Romain - does
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
> do what
> you want?
>
> On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste Onofré <[email protected]>
> wrote:
>
>> It sounds like the "Trigger" in the Splittable DoFn, no ?
>>
>> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
>>
>> Regards
>> JB
>>
>>
>> On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote:
>> > it gives the fn/transform the ability to save a state - it can get
>> > back on "restart" / whatever unit we can use, probably runner
>> > dependent? Without that you need to rewrite all IO usage with
>> > something like the previous pattern which makes the IO not self
>> > sufficient and kind of makes the entry cost and usage of beam way
>> > further.
>> >
>> > In my mind it is exactly what jbatch/spring-batch uses but adapted to
>> > beam (stream in particular) case.
>> >
>> > Romain Manni-Bucau
>> > @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >
>> >
>> > 2017-11-17 6:49 GMT+01:00 Reuven Lax <[email protected]>:
>> >> Romain,
>> >>
>> >> Can you define what you mean by checkpoint? What are the semantics, what
>> >> does it accomplish?
>> >>
>> >> Reuven
>> >>
>> >> On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau <
>> [email protected]>
>> >> wrote:
>> >>
>> >>> Yes, what I propose earlier was:
>> >>>
>> >>> I. checkpoint marker:
>> >>>
>> >>> @AnyBeamAnnotation
>> >>> @CheckpointAfter
>> >>> public void someHook(SomeContext ctx);
>> >>>
>> >>>
>> >>> II. pipeline.apply(ParDo.of(new MyFn()).withCheckpointAlgorithm(new
>> >>> CountingAlgo()))
>> >>>
>> >>> III. (I like this one less)
>> >>>
>> >>> // in the dofn
>> >>> @CheckpointTester
>> >>> public boolean shouldCheckpoint();
>> >>>
>> >>> IV. @Checkpointer Serializable getCheckpoint(); in the dofn per element
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> Romain Manni-Bucau
>> >>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >>>
>> >>>
>> >>> 2017-11-17 6:06 GMT+01:00 Raghu Angadi <[email protected]>:
>> >>>> How would you define it (rough API is fine)?. Without more details,
>> it is
>> >>>> not easy to see wider applicability and feasibility in runners.
>> >>>>
>> >>>> On Thu, Nov 16, 2017 at 1:13 PM, Romain Manni-Bucau <
>> >>> [email protected]>
>> >>>> wrote:
>> >>>>
>> >>>>> This is a fair summary of the current state but also where beam can
>> >>> have a
>> >>>>> very strong added value and make big data great and smooth.
>> >>>>>
>> >>>>> Instead of this replay feature isnt checkpointing willable? In
>> >>> particular
>> >>>>> with SDF no?
>> >>>>>
>> >>>>>
>> >>>>> Le 16 nov. 2017 19:50, "Raghu Angadi" <[email protected]> a
>> >>>>> écrit :
>> >>>>>
>> >>>>>> Core issue here is that there is no explicit concept of 'checkpoint'
>> >>> in
>> >>>>>> Beam (UnboundedSource has a method 'getCheckpointMark' but that
>> >>> refers to
>> >>>>>> the checkoint on external source). Runners do checkpoint internally
>> as
>> >>>>>> implementation detail. Flink's checkpoint model is entirely
>> different
>> >>>>> from
>> >>>>>> Dataflow's and Spark's.
>> >>>>>>
>> >>>>>> @StableReplay helps, but it does not explicitly talk about a
>> >>> checkpoint
>> >>>>> by
>> >>>>>> design.
>> >>>>>>
>> >>>>>> If you are looking to achieve some guarantees with a sink/DoFn, I
>> >>> think
>> >>>>> it
>> >>>>>> is better to start with the requirements. I worked on exactly-once
>> >>> sink
>> >>>>> for
>> >>>>>> Kafka (see KafkaIO.write().withEOS()), where we essentially reshard
>> >>> the
>> >>>>>> elements and assign sequence numbers to elements with in each shard.
>> >>>>>> Duplicates in replays are avoided based on these sequence numbers.
>> >>> DoFn
>> >>>>>> state API is used to buffer out-of order replays. The implementation
>> >>>>>> strategy works in Dataflow but not in Flink which has a horizontal
>> >>>>>> checkpoint. KafkaIO checks for compatibility.
>> >>>>>>
>> >>>>>> On Wed, Nov 15, 2017 at 12:38 AM, Romain Manni-Bucau <
>> >>>>>> [email protected]>
>> >>>>>> wrote:
>> >>>>>>
>> >>>>>>> Hi guys,
>> >>>>>>>
>> >>>>>>> The subject is a bit provocative but the topic is real and coming
>> >>>>>>> again and again with the beam usage: how a dofn can handle some
>> >>>>>>> "chunking".
>> >>>>>>>
>> >>>>>>> The need is to be able to commit each N records but with N not too
>> >>> big.
>> >>>>>>>
>> >>>>>>> The natural API for that in beam is the bundle one but bundles are
>> >>> not
>> >>>>>>> reliable since they can be very small (flink) - we can say it is
>> >>> "ok"
>> >>>>>>> even if it has some perf impacts - or too big (spark does full size
>> >>> /
>> >>>>>>> #workers).
>> >>>>>>>
>> >>>>>>> The workaround is what we see in the ES I/O: a maxSize which does
>> an
>> >>>>>>> eager flush. The issue is that then the checkpoint is not respected
>> >>>>>>> and you can process multiple times the same records.
>> >>>>>>>
>> >>>>>>> Any plan to make this API reliable and controllable from a beam
>> >>> point
>> >>>>>>> of view (at least in a max manner)?
>> >>>>>>>
>> >>>>>>> Thanks,
>> >>>>>>> Romain Manni-Bucau
>> >>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >>>>>>>
>> >>>>>>
>> >>>>>
>> >>>
>>
>> --
>> Jean-Baptiste Onofré
>> [email protected]
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>

Reply via email to