Hmm, I didn't find the doc - if you have the link not far it would be
appreciated - but "before" sounds not enough, it should be "after" in
case there was a "flush" no?

Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-15 10:10 GMT+01:00 Reuven Lax <re...@google.com.invalid>:
> If you set @StableReplay before a ParDo, it forces a checkpoint before that
> ParDo.
>
> On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau <rmannibu...@gmail.com>
> wrote:
>
>> It sounds a good start. I'm not sure how a group by key (and not by
>> size) can help controlling the checkpointing interval. Wonder if we
>> shouldn't be able to have a CheckpointPolicy { boolean
>> shouldCheckpoint() } used in the processing event loop. Default could
>> be up to the runner but if set on the transform (or dofn) it would be
>> used to control when the checkpoint is done. Thinking out loud it
>> sounds close to jbatch checkpoint algorithm
>> (https://docs.oracle.com/javaee/7/api/javax/batch/api/
>> chunk/CheckpointAlgorithm.html)
>>
>> Romain Manni-Bucau
>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>
>>
>> 2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net>:
>> > Yes, @StableReplay, that's the annotation. Thanks.
>> >
>> >
>> > On 11/15/2017 09:52 AM, Reuven Lax wrote:
>> >>
>> >> Romain,
>> >>
>> >> I think the @StableReplay semantic that Kenn proposed a month or so ago
>> is
>> >> what is needed here.
>> >>
>> >> Essentially it will ensure that the GroupByKey iterable is stable and
>> >> checkpointed. So on replay, the GroupByKey is guaranteed to receive the
>> >> exact same iterable as it did before. The annotation can be put on a
>> ParDo
>> >> as well, in which case it ensures stability (and checkpointing) of the
>> >> individual ParDo elements.
>> >>
>> >> Reuven
>> >>
>> >> On Wed, Nov 15, 2017 at 4:49 PM, Romain Manni-Bucau
>> >> <rmannibu...@gmail.com>
>> >> wrote:
>> >>
>> >>> 2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net>:
>> >>>>
>> >>>> Hi Romain,
>> >>>>
>> >>>> You are right: currently, the chunking is related to bundles. Today,
>> the
>> >>>> bundle size is under the runner responsibility.
>> >>>>
>> >>>> I think it's fine because only the runner know an efficient bundle
>> size.
>> >>>
>> >>> I'm
>> >>>>
>> >>>> afraid giving the "control" of the bundle size to the end user (via
>> >>>> pipeline) can result to huge performances issue depending of the
>> runner.
>> >>>>
>> >>>> It doesn't mean that we can't use an uber layer: it's what we do in
>> >>>> ParDoWithBatch or DoFn in IO Sink where we have a batch size.
>> >>>>
>> >>>> Anyway, the core problem is about the checkpoint: why a checkpoint is
>> >>>> not
>> >>>> "respected" by an IO or runner ?
>> >>>
>> >>>
>> >>>
>> >>> Take the example of a runner deciding the bundle size is 4 and the IO
>> >>> deciding the commit-interval (batch semantic) is 2, what happens if
>> >>> the 3rd record fails? You have pushed to the store 2 records which can
>> >>> be reprocessed by a restart of the bundle and you can get duplicates.
>> >>>
>> >>> Rephrased: I think we need as a framework a batch/chunk solution which
>> >>> is reliable. I understand bundles is mapped on the runner and not
>> >>> really controlled but can we get something more reliable for the user?
>> >>> Maybe we need a @BeforeBatch or something like that.
>> >>>
>> >>>>
>> >>>> Regards
>> >>>> JB
>> >>>>
>> >>>>
>> >>>> On 11/15/2017 09:38 AM, Romain Manni-Bucau wrote:
>> >>>>>
>> >>>>>
>> >>>>> Hi guys,
>> >>>>>
>> >>>>> The subject is a bit provocative but the topic is real and coming
>> >>>>> again and again with the beam usage: how a dofn can handle some
>> >>>>> "chunking".
>> >>>>>
>> >>>>> The need is to be able to commit each N records but with N not too
>> big.
>> >>>>>
>> >>>>> The natural API for that in beam is the bundle one but bundles are
>> not
>> >>>>> reliable since they can be very small (flink) - we can say it is "ok"
>> >>>>> even if it has some perf impacts - or too big (spark does full size /
>> >>>>> #workers).
>> >>>>>
>> >>>>> The workaround is what we see in the ES I/O: a maxSize which does an
>> >>>>> eager flush. The issue is that then the checkpoint is not respected
>> >>>>> and you can process multiple times the same records.
>> >>>>>
>> >>>>> Any plan to make this API reliable and controllable from a beam point
>> >>>>> of view (at least in a max manner)?
>> >>>>>
>> >>>>> Thanks,
>> >>>>> Romain Manni-Bucau
>> >>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >>>>>
>> >>>>
>> >>>> --
>> >>>> Jean-Baptiste Onofré
>> >>>> jbono...@apache.org
>> >>>> http://blog.nanthrax.net
>> >>>> Talend - http://www.talend.com
>> >>>
>> >>>
>> >>
>> >
>> > --
>> > Jean-Baptiste Onofré
>> > jbono...@apache.org
>> > http://blog.nanthrax.net
>> > Talend - http://www.talend.com
>>

Reply via email to