Hi Romain,

1. you always have the GlobalWindow at least. It's more related to trigger.
2. How would you define this ? With annotation (on what in that case) or using checkpoint method ?
3. Agree to have a core PTransform for that.

Regards
JB

On 11/15/2017 02:16 PM, Romain Manni-Bucau wrote:
@Reuven: it looks like a good workaround
@Ken: thks a lot for the link!

@all:

1. do you think it is doable without windowing usage (to have
something more reliable in term of runner since it will depend on less
primitives?
2. what about allowing the user to define when to checkpoint?
3. can we get this kind of "composite" pattern in the beam core?



Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-15 14:12 GMT+01:00 Kenneth Knowles <k...@google.com.invalid>:
In case the connection is not clear to folks on this thread, I pinged the
thread on @StableReplay / @RequiresStableInput / etc and opened a draft PR
at https://github.com/apache/beam/pull/4135.

On Wed, Nov 15, 2017 at 3:24 AM, Reuven Lax <re...@google.com.invalid>
wrote:

so I think the following will do exactly that and can be easily factored
into a reusable transform (modulo Java type boilerplate):

pCollection.apply(WithKeys.of((Element e) ->
ThreadLocalRandom.current().nextInt(N))
                   .apply(Window.into(new GlobalWindows())

.triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.
elementCountAtLeast(100))))
                   .apply(GroupByKey.create())
                   .apply(ParDo.of(new DoFn<>() {
                       @ProcessElement
                       @StableReplay
                        public void processElement(ProcessContext c) {
                          // Insert c.element().getValue() into backend.
                        }
                    });

On Wed, Nov 15, 2017 at 7:00 PM, Romain Manni-Bucau <rmannibu...@gmail.com

wrote:

2017-11-15 11:42 GMT+01:00 Reuven Lax <re...@google.com.invalid>:
Can we describe this at a higher level?

I think what you want is the following. Please correct if I'm
misunderstanding.

Batches of 100 elements (is this a hard requirement, or do they have to
be
"approximately" 100 element?)

Approximately is fine while documented (what is not is 1000000 instead
of 10 for instance)


Once you see a batch, you're guaranteed to see the same batch on
retries.

+1


You want to then idempotently insert this batch into some backend.
Things
may fail, workers may crash, but in that case you want to get the exact
same batch back so you can insert it again.

+1


Do you care about ordering? On failure do you have to see the same
batches
in the same order as before, or is it sufficient to see the same
batches?

Beam doesnt everywhere so I guess it is not important - at least for
my cases this statement is true.


Reuven

On Wed, Nov 15, 2017 at 5:58 PM, Romain Manni-Bucau <
rmannibu...@gmail.com>
wrote:

Overall goal is to ensure each 100 elements max, a "backend" (as
datastore) flush/commit/push is done and is aligned with beam
checkpoints. You can see it as bringing the "general" commit-interval
notion to beam and kind of get rid of the bundle notion which is
almost impossible to use today.

Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-15 10:27 GMT+01:00 Reuven Lax <re...@google.com.invalid>:
It's in the dev list archives, not sure if there's a doc yet.

I'm not quite sure I understand what you mean by a "flush" Can you
describe
the problem you're trying to solve?

Reuven

On Wed, Nov 15, 2017 at 5:25 PM, Romain Manni-Bucau <
rmannibu...@gmail.com>
wrote:

Hmm, I didn't find the doc - if you have the link not far it would
be
appreciated - but "before" sounds not enough, it should be "after"
in
case there was a "flush" no?

Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-15 10:10 GMT+01:00 Reuven Lax <re...@google.com.invalid>:
If you set @StableReplay before a ParDo, it forces a checkpoint
before
that
ParDo.

On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau <
rmannibu...@gmail.com>
wrote:

It sounds a good start. I'm not sure how a group by key (and not
by
size) can help controlling the checkpointing interval. Wonder if
we
shouldn't be able to have a CheckpointPolicy { boolean
shouldCheckpoint() } used in the processing event loop. Default
could
be up to the runner but if set on the transform (or dofn) it
would be
used to control when the checkpoint is done. Thinking out loud
it
sounds close to jbatch checkpoint algorithm
(https://docs.oracle.com/javaee/7/api/javax/batch/api/
chunk/CheckpointAlgorithm.html)

Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net
:
Yes, @StableReplay, that's the annotation. Thanks.


On 11/15/2017 09:52 AM, Reuven Lax wrote:

Romain,

I think the @StableReplay semantic that Kenn proposed a month
or
so
ago
is
what is needed here.

Essentially it will ensure that the GroupByKey iterable is
stable
and
checkpointed. So on replay, the GroupByKey is guaranteed to
receive
the
exact same iterable as it did before. The annotation can be
put
on a
ParDo
as well, in which case it ensures stability (and
checkpointing) of
the
individual ParDo elements.

Reuven

On Wed, Nov 15, 2017 at 4:49 PM, Romain Manni-Bucau
<rmannibu...@gmail.com>
wrote:

2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré <
j...@nanthrax.net
:

Hi Romain,

You are right: currently, the chunking is related to
bundles.
Today,
the
bundle size is under the runner responsibility.

I think it's fine because only the runner know an efficient
bundle
size.

I'm

afraid giving the "control" of the bundle size to the end
user
(via
pipeline) can result to huge performances issue depending
of
the
runner.

It doesn't mean that we can't use an uber layer: it's what
we
do in
ParDoWithBatch or DoFn in IO Sink where we have a batch
size.

Anyway, the core problem is about the checkpoint: why a
checkpoint
is
not
"respected" by an IO or runner ?



Take the example of a runner deciding the bundle size is 4
and
the
IO
deciding the commit-interval (batch semantic) is 2, what
happens
if
the 3rd record fails? You have pushed to the store 2 records
which
can
be reprocessed by a restart of the bundle and you can get
duplicates.

Rephrased: I think we need as a framework a batch/chunk
solution
which
is reliable. I understand bundles is mapped on the runner
and
not
really controlled but can we get something more reliable for
the
user?
Maybe we need a @BeforeBatch or something like that.


Regards
JB


On 11/15/2017 09:38 AM, Romain Manni-Bucau wrote:


Hi guys,

The subject is a bit provocative but the topic is real and
coming
again and again with the beam usage: how a dofn can handle
some
"chunking".

The need is to be able to commit each N records but with N
not
too
big.

The natural API for that in beam is the bundle one but
bundles
are
not
reliable since they can be very small (flink) - we can say
it
is
"ok"
even if it has some perf impacts - or too big (spark does
full
size /
#workers).

The workaround is what we see in the ES I/O: a maxSize
which
does
an
eager flush. The issue is that then the checkpoint is not
respected
and you can process multiple times the same records.

Any plan to make this API reliable and controllable from a
beam
point
of view (at least in a max manner)?

Thanks,
Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com




--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com






--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to