It sounds a good start. I'm not sure how a group by key (and not by
size) can help controlling the checkpointing interval. Wonder if we
shouldn't be able to have a CheckpointPolicy { boolean
shouldCheckpoint() } used in the processing event loop. Default could
be up to the runner but if set on the transform (or dofn) it would be
used to control when the checkpoint is done. Thinking out loud it
sounds close to jbatch checkpoint algorithm
(https://docs.oracle.com/javaee/7/api/javax/batch/api/chunk/CheckpointAlgorithm.html)

Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net>:
> Yes, @StableReplay, that's the annotation. Thanks.
>
>
> On 11/15/2017 09:52 AM, Reuven Lax wrote:
>>
>> Romain,
>>
>> I think the @StableReplay semantic that Kenn proposed a month or so ago is
>> what is needed here.
>>
>> Essentially it will ensure that the GroupByKey iterable is stable and
>> checkpointed. So on replay, the GroupByKey is guaranteed to receive the
>> exact same iterable as it did before. The annotation can be put on a ParDo
>> as well, in which case it ensures stability (and checkpointing) of the
>> individual ParDo elements.
>>
>> Reuven
>>
>> On Wed, Nov 15, 2017 at 4:49 PM, Romain Manni-Bucau
>> <rmannibu...@gmail.com>
>> wrote:
>>
>>> 2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net>:
>>>>
>>>> Hi Romain,
>>>>
>>>> You are right: currently, the chunking is related to bundles. Today, the
>>>> bundle size is under the runner responsibility.
>>>>
>>>> I think it's fine because only the runner know an efficient bundle size.
>>>
>>> I'm
>>>>
>>>> afraid giving the "control" of the bundle size to the end user (via
>>>> pipeline) can result to huge performances issue depending of the runner.
>>>>
>>>> It doesn't mean that we can't use an uber layer: it's what we do in
>>>> ParDoWithBatch or DoFn in IO Sink where we have a batch size.
>>>>
>>>> Anyway, the core problem is about the checkpoint: why a checkpoint is
>>>> not
>>>> "respected" by an IO or runner ?
>>>
>>>
>>>
>>> Take the example of a runner deciding the bundle size is 4 and the IO
>>> deciding the commit-interval (batch semantic) is 2, what happens if
>>> the 3rd record fails? You have pushed to the store 2 records which can
>>> be reprocessed by a restart of the bundle and you can get duplicates.
>>>
>>> Rephrased: I think we need as a framework a batch/chunk solution which
>>> is reliable. I understand bundles is mapped on the runner and not
>>> really controlled but can we get something more reliable for the user?
>>> Maybe we need a @BeforeBatch or something like that.
>>>
>>>>
>>>> Regards
>>>> JB
>>>>
>>>>
>>>> On 11/15/2017 09:38 AM, Romain Manni-Bucau wrote:
>>>>>
>>>>>
>>>>> Hi guys,
>>>>>
>>>>> The subject is a bit provocative but the topic is real and coming
>>>>> again and again with the beam usage: how a dofn can handle some
>>>>> "chunking".
>>>>>
>>>>> The need is to be able to commit each N records but with N not too big.
>>>>>
>>>>> The natural API for that in beam is the bundle one but bundles are not
>>>>> reliable since they can be very small (flink) - we can say it is "ok"
>>>>> even if it has some perf impacts - or too big (spark does full size /
>>>>> #workers).
>>>>>
>>>>> The workaround is what we see in the ES I/O: a maxSize which does an
>>>>> eager flush. The issue is that then the checkpoint is not respected
>>>>> and you can process multiple times the same records.
>>>>>
>>>>> Any plan to make this API reliable and controllable from a beam point
>>>>> of view (at least in a max manner)?
>>>>>
>>>>> Thanks,
>>>>> Romain Manni-Bucau
>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>>>>
>>>>
>>>> --
>>>> Jean-Baptiste Onofré
>>>> jbono...@apache.org
>>>> http://blog.nanthrax.net
>>>> Talend - http://www.talend.com
>>>
>>>
>>
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com

Reply via email to