Got it now.

AFAIR, Kenn discussed about an annotation related to that recently. I don't remember the annotation, but basically it was a link between a group of elements (batch) and the checkpoint.

Regards
JB

On 11/15/2017 09:49 AM, Romain Manni-Bucau wrote:
2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net>:
Hi Romain,

You are right: currently, the chunking is related to bundles. Today, the
bundle size is under the runner responsibility.

I think it's fine because only the runner know an efficient bundle size. I'm
afraid giving the "control" of the bundle size to the end user (via
pipeline) can result to huge performances issue depending of the runner.

It doesn't mean that we can't use an uber layer: it's what we do in
ParDoWithBatch or DoFn in IO Sink where we have a batch size.

Anyway, the core problem is about the checkpoint: why a checkpoint is not
"respected" by an IO or runner ?


Take the example of a runner deciding the bundle size is 4 and the IO
deciding the commit-interval (batch semantic) is 2, what happens if
the 3rd record fails? You have pushed to the store 2 records which can
be reprocessed by a restart of the bundle and you can get duplicates.

Rephrased: I think we need as a framework a batch/chunk solution which
is reliable. I understand bundles is mapped on the runner and not
really controlled but can we get something more reliable for the user?
Maybe we need a @BeforeBatch or something like that.


Regards
JB


On 11/15/2017 09:38 AM, Romain Manni-Bucau wrote:

Hi guys,

The subject is a bit provocative but the topic is real and coming
again and again with the beam usage: how a dofn can handle some
"chunking".

The need is to be able to commit each N records but with N not too big.

The natural API for that in beam is the bundle one but bundles are not
reliable since they can be very small (flink) - we can say it is "ok"
even if it has some perf impacts - or too big (spark does full size /
#workers).

The workaround is what we see in the ES I/O: a maxSize which does an
eager flush. The issue is that then the checkpoint is not respected
and you can process multiple times the same records.

Any plan to make this API reliable and controllable from a beam point
of view (at least in a max manner)?

Thanks,
Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to