And the control is given to the DoFn developer via annotations, right ?

So, bundle would be "hidden" and be internal to the runner (which makes sense I think) and we introduce "control" points for the DoFn developer that the runner will deal with.

Correct ?

Regards
JB

On 11/15/2017 10:58 AM, Romain Manni-Bucau wrote:
Overall goal is to ensure each 100 elements max, a "backend" (as
datastore) flush/commit/push is done and is aligned with beam
checkpoints. You can see it as bringing the "general" commit-interval
notion to beam and kind of get rid of the bundle notion which is
almost impossible to use today.

Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-15 10:27 GMT+01:00 Reuven Lax <re...@google.com.invalid>:
It's in the dev list archives, not sure if there's a doc yet.

I'm not quite sure I understand what you mean by a "flush" Can you describe
the problem you're trying to solve?

Reuven

On Wed, Nov 15, 2017 at 5:25 PM, Romain Manni-Bucau <rmannibu...@gmail.com>
wrote:

Hmm, I didn't find the doc - if you have the link not far it would be
appreciated - but "before" sounds not enough, it should be "after" in
case there was a "flush" no?

Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-15 10:10 GMT+01:00 Reuven Lax <re...@google.com.invalid>:
If you set @StableReplay before a ParDo, it forces a checkpoint before
that
ParDo.

On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau <
rmannibu...@gmail.com>
wrote:

It sounds a good start. I'm not sure how a group by key (and not by
size) can help controlling the checkpointing interval. Wonder if we
shouldn't be able to have a CheckpointPolicy { boolean
shouldCheckpoint() } used in the processing event loop. Default could
be up to the runner but if set on the transform (or dofn) it would be
used to control when the checkpoint is done. Thinking out loud it
sounds close to jbatch checkpoint algorithm
(https://docs.oracle.com/javaee/7/api/javax/batch/api/
chunk/CheckpointAlgorithm.html)

Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net>:
Yes, @StableReplay, that's the annotation. Thanks.


On 11/15/2017 09:52 AM, Reuven Lax wrote:

Romain,

I think the @StableReplay semantic that Kenn proposed a month or so
ago
is
what is needed here.

Essentially it will ensure that the GroupByKey iterable is stable and
checkpointed. So on replay, the GroupByKey is guaranteed to receive
the
exact same iterable as it did before. The annotation can be put on a
ParDo
as well, in which case it ensures stability (and checkpointing) of
the
individual ParDo elements.

Reuven

On Wed, Nov 15, 2017 at 4:49 PM, Romain Manni-Bucau
<rmannibu...@gmail.com>
wrote:

2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net>:

Hi Romain,

You are right: currently, the chunking is related to bundles.
Today,
the
bundle size is under the runner responsibility.

I think it's fine because only the runner know an efficient bundle
size.

I'm

afraid giving the "control" of the bundle size to the end user (via
pipeline) can result to huge performances issue depending of the
runner.

It doesn't mean that we can't use an uber layer: it's what we do in
ParDoWithBatch or DoFn in IO Sink where we have a batch size.

Anyway, the core problem is about the checkpoint: why a checkpoint
is
not
"respected" by an IO or runner ?



Take the example of a runner deciding the bundle size is 4 and the
IO
deciding the commit-interval (batch semantic) is 2, what happens if
the 3rd record fails? You have pushed to the store 2 records which
can
be reprocessed by a restart of the bundle and you can get
duplicates.

Rephrased: I think we need as a framework a batch/chunk solution
which
is reliable. I understand bundles is mapped on the runner and not
really controlled but can we get something more reliable for the
user?
Maybe we need a @BeforeBatch or something like that.


Regards
JB


On 11/15/2017 09:38 AM, Romain Manni-Bucau wrote:


Hi guys,

The subject is a bit provocative but the topic is real and coming
again and again with the beam usage: how a dofn can handle some
"chunking".

The need is to be able to commit each N records but with N not too
big.

The natural API for that in beam is the bundle one but bundles are
not
reliable since they can be very small (flink) - we can say it is
"ok"
even if it has some perf impacts - or too big (spark does full
size /
#workers).

The workaround is what we see in the ES I/O: a maxSize which does
an
eager flush. The issue is that then the checkpoint is not
respected
and you can process multiple times the same records.

Any plan to make this API reliable and controllable from a beam
point
of view (at least in a max manner)?

Thanks,
Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com




--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com



--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to