It sounds reasonable to me.

And agree for Spark, I would like to merge Spark 2 update first.

Regards
JB

On 11/30/2017 03:09 PM, Romain Manni-Bucau wrote:
Guys,

what about moving getMaxBundleSize from flink options to pipeline
options. I think all runners can support it right? Spark code needs
the merge of the v2 before being able to be implemented probably but I
don't see any blocker.

wdyt?

Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-19 8:19 GMT+01:00 Romain Manni-Bucau <[email protected]>:
@Eugene: "workaround" as specific to the IO each time and therefore
still highlight a lack in the core.

Other comments inline


2017-11-19 7:40 GMT+01:00 Robert Bradshaw <[email protected]>:
There is a possible fourth issue that we don't handle well: efficiency. For
very large bundles, it may be advantageous to avoid replaying a bunch of
idempotent operations if there were a way to record what ones we're sure
went through. Not sure if that's the issue here (though one could possibly
do this with SDFs, one can preemptively returning periodically before an
element (or portion thereof) is done).

+1, also lead to the IO handling its own chunking/bundles and
therefore solves all issues at once.


On Sat, Nov 18, 2017 at 6:58 PM, Eugene Kirpichov <
[email protected]> wrote:

I disagree that the usage of document id in ES is a "workaround" - it does
not address any *accidental *complexity
<https://en.wikipedia.org/wiki/No_Silver_Bullet> coming from shortcomings
of Beam, it addresses the *essential* complexity that a distributed system
forces one to take it as a fact of nature that the same write
(mutation) will happen multiple times, so if you want a mutation to happen
"as-if" it happened exactly once, the mutation itself must be idempotent
<https://en.wikipedia.org/wiki/Idempotence>. Insert-with-id (upsert
<https://en.wikipedia.org/wiki/Merge_(SQL)>) is a classic example of an
idempotent mutation, and it's very good that Elasticsearch provides it - if
it didn't, no matter how good of an API Beam had, achieving exactly-once
writes would be theoretically impossible. Are we in agreement on this so
far?

Next: you seem to be discussing 3 issues together, all of which are valid
issues, but they seem unrelated to me:
1. Exactly-once mutation
2. Batching multiple mutations into one RPC.
3. Backpressure

#1: was considered above. The system the IO is talking to has to support
idempotent mutations, in an IO-specific way, and the IO has to take
advantage of them, in the IO-specific way - end of story.

Agree but don't forget the original point was about "chunks" and not
individual records.


#2: a batch of idempotent operations is also idempotent, so this doesn't
add anything new semantically. Syntactically - Beam already allows you to
write your own batching by notifying you of permitted batch boundaries
(Start/FinishBundle). Sure, it could do more, but from my experience the
batching in IOs I've seen is one of the easiest and least error-prone
parts, so I don't see something worth an extended discussion here.

"Beam already allows you to
  write your own batching by notifying you of permitted batch boundaries
  (Start/FinishBundle)"

Is wrong since the bundle is potentially the whole PCollection (spark)
so this is not even an option until you use the SDF (back to the same
point).
Once again the API looks fine but no implementation makes it true. It
would be easy to change it in spark, flink can be ok since it targets
more the streaming case, not sure of others, any idea?



#3: handling backpressure is a complex problem with multiple facets: 1) how
do you know you're being throttled, and by how much are you exceeding the
external system's capacity?

This is the whole point of backpressure, the system sends it back to
you (header like or status technic in general)

2) how do you communicate this signal to the
runner?

You are a client so you get the meta in the response - whatever techno.

3) what does the runner do in response?

Runner nothing but the IO adapts its handling as mentionned before
(wait and retry, skip, ... depending the config)

4) how do you wait until
it's ok to try again?

This is one point to probably enhance in beam but waiting in the
processing is an option if the source has some buffering otherwise it
requires to have a buffer fallback and max size if the wait mode is
activated.

You seem to be advocating for solving one facet of this problem, which is:
you want it to be possible to signal to the runner "I'm being throttled,
please end the bundle", right? If so - I think this (ending the bundle) is
unnecessary: the DoFn can simply do an exponential back-off sleep loop.

Agree, never said the runner should know but GBK+output doesnt work
cause you dont own the GBK.

This is e.g. what DatastoreIO does
<https://github.com/apache/beam/blob/master/sdks/java/io/
google-cloud-platform/src/main/java/org/apache/beam/sdk/
io/gcp/datastore/DatastoreV1.java#L1318>
and
this is in general how most systems I've seen handle backpressure. Is there
something I'm missing? In particular, is there any compelling reason why
you think it'd be beneficial e.g. for DatastoreIO to commit the results of
the bundle so far before processing other elements?

It was more about ensuring you validate early a subset of the whole
bundle and avoid to reprocess it if it fails later.


So to summarize I see 2 outcomes:

1. impl SDF in all runners
2. make the bundle size upper bounded - through a pipeline option - in
all runners, not sure this one is doable everywhere since I mainly
checked spark case


Again, it might be that I'm still misunderstanding what you're trying to
say. One of the things it would help to clarify would be - exactly what do
you mean by "how batch frameworks solved that for years": can you point at
an existing API in some other framework that achieves what you want?

On Sat, Nov 18, 2017 at 1:02 PM Romain Manni-Bucau <[email protected]>
wrote:

Eugene, point - and issue with a single sample - is you can always find
*workarounds* on a case by case basis as the id one with ES but beam
doesnt
solve the problem as a framework.

 From my past, I clearly dont see how batch frameworks solved that for
years
and beam is not able to do it - keep in mind it is the same kind of
techno,
it just uses different sources and bigger clusters so no real reason to
not
have the same feature quality. The only potential reason i see is there
is
no tracking of the state into the cluster - e2e. But i dont see why there
wouldnt be. Do I miss something here?

An example could be: take a github crawler computing stats on the whole
girhub repos which is based on a rest client as example. You will need to
handle the rate limit and likely want to "commit" each time you reach a
rate limit with likely some buffering strategy with a max size before
really waiting. How do you do it with a GBK independent of your dofn? You
are not able to compose correctly the fn between them :(.


Le 18 nov. 2017 20:48, "Eugene Kirpichov" <[email protected]>
a
écrit :

After giving this thread my best attempt at understanding exactly what is
the problem and the proposed solution, I'm afraid I still fail to
understand both. To reiterate, I think the only way to make progress here
is to be more concrete: (quote) take some IO that you think could be
easier
to write with your proposed API, give the contents of a hypothetical
PCollection being written to this IO, give the code of a hypothetical
DoFn
implementing the write using your API, and explain what you'd expect to
happen at runtime. I'm going to re-engage in this thread after such an
example is given.

On Sat, Nov 18, 2017, 5:00 AM Romain Manni-Bucau <[email protected]>
wrote:

First bundle retry is unusable with dome runners like spark where the
bundle size is the collection size / number of work. This means a user
cant
use bundle API or feature reliably and portably - which is beam
promise.
Aligning chunking and bundles would guarantee that bit can be not
desired,
that is why i thought it can be another feature.

GBK works until the IO knows about that and both concepts are not
always
orthogonal - backpressure like systems is a trivial common example.
This
means the IO (dofn) must be able to do it itself at some point.

Also note the GBK works only if the IO can take a list which is never
the
case today.

Big questions for me are: is SDF the way to go since it provides the
needed
API bit is not yet supported? What about existing IO? Should beam
provide
an auto wrapping of dofn for that pre-aggregated support and simulate
bundles to the actual IO impl to keep the existing API?


Le 17 nov. 2017 19:20, "Raghu Angadi" <[email protected]> a
écrit :

On Fri, Nov 17, 2017 at 1:02 AM, Romain Manni-Bucau <
[email protected]

wrote:

Yep, just take ES IO, if a part of a bundle fails you are in an
unmanaged state. This is the case for all O (of IO ;)). Issue is not
much about "1" (the code it takes) but more the fact it doesn't
integrate with runner features and retries potentially: what happens
if a bundle has a failure? => undefined today. 2. I'm fine with it
while we know exactly what happens when we restart after a bundle
failure. With ES the timestamp can be used for instance.


This deterministic batching can be achieved even now with an extra
GroupByKey (and if you want ordering on top of that, will need another
GBK). Don't know if that is too costly in your case. I would need bit
more
details on handling ES IO write retries to see it could be simplified.
Note
that retries occur with or without any failures in your DoFn.

The biggest negative with GBK approach is that it doesn't provide same
guarantees on Flink.

I don't see how GroubIntoBatches in Beam provides specific guarantees
on
deterministic batches.

Thinking about it the SDF is really a way to do it since the SDF will
manage the bulking and associated with the runner "retry" it seems it
covers the needs.

Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-17 9:23 GMT+01:00 Eugene Kirpichov
<[email protected]
:
I must admit I'm still failing to understand the problem, so let's
step
back even further.

Could you give an example of an IO that is currently difficult to
implement
specifically because of lack of the feature you're talking about?

I'm asking because I've reviewed almost all Beam IOs and don't
recall
seeing a similar problem. Sure, a lot of IOs do batching within a
bundle,
but 1) it doesn't take up much code (granted, it would be even
easier
if
Beam did it for us) and 2) I don't remember any of them requiring
the
batches to be deterministic, and I'm having a hard time imagining
what
kind
of storage system would be able to deduplicate if batches were
deterministic but wouldn't be able to deduplicate if they weren't.

On Thu, Nov 16, 2017 at 11:50 PM Romain Manni-Bucau <
[email protected]>
wrote:

Ok, let me try to step back and summarize what we have today and
what
I
miss:

1. we can handle chunking in beam through group in batch (or
equivalent)
but:
    > it is not built-in into the transforms (IO) and it is
controlled
from outside the transforms so no way for a transform to do it
properly without handling itself a composition and links between
multiple dofns to have notifications and potentially react
properly
or
handle backpressure from its backend
2. there is no restart feature because there is no real state
handling
at the moment. this sounds fully delegated to the runner but I was
hoping to have more guarantees from the used API to be able to
restart
a pipeline (mainly batch since it can be irrelevant or delegates
to
the backend for streams) and handle only not commited records so
it
requires some persistence outside the main IO storages to do it
properly
    > note this is somehow similar to the monitoring topic which
miss
persistence ATM so it can end up to beam to have a pluggable
storage
for a few concerns


Short term I would be happy with 1 solved properly, long term I
hope
2
will be tackled without workarounds requiring custom wrapping of
IO
to
use a custom state persistence.



Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-17 7:44 GMT+01:00 Jean-Baptiste Onofré <[email protected]>:
Thanks for the explanation. Agree, we might talk about different
things
using the same wording.

I'm also struggling to understand the use case (for a generic
DoFn).

Regards
JB


On 11/17/2017 07:40 AM, Eugene Kirpichov wrote:

To avoid spending a lot of time pursuing a false path, I'd like
to
say
straight up that SDF is definitely not going to help here,
despite
the
fact
that its API includes the term "checkpoint". In SDF, the
"checkpoint"
captures the state of processing within a single element. If
you're
applying an SDF to 1000 elements, it will, like any other DoFn,
be
applied
to each of them independently and in parallel, and you'll have
1000
checkpoints capturing the state of processing each of these
elements,
which
is probably not what you want.

I'm afraid I still don't understand what kind of checkpoint you
need, if
it
is not just deterministic grouping into batches. "Checkpoint"
is
a
very
broad term and it's very possible that everybody in this thread
is
talking
about different things when saying it. So it would help if you
could
give
a
more concrete example: for example, take some IO that you think
could be
easier to write with your proposed API, give the contents of a
hypothetical
PCollection being written to this IO, give the code of a
hypothetical
DoFn
implementing the write using your API, and explain what you'd
expect
to
happen at runtime.

On Thu, Nov 16, 2017 at 10:33 PM Romain Manni-Bucau
<[email protected]>
wrote:

@Eugene: yes and the other alternative of Reuven too but it is
still
1. relying on timers, 2. not really checkpointed

In other words it seems all solutions are to create a chunk of
size
1
and replayable to fake the lack of chunking in the framework.
This
always implies a chunk handling outside the component
(typically
before for an output). My point is I think IO need it in their
own
"internal" or at least control it themselves since the chunk
size
is
part of the IO handling most of the time.

I think JB spoke of the same "group before" trick using
restrictions
which can work I have to admit if SDF are implemented by
runners.
Is
there a roadmap/status on that? Last time I checked SDF was a
great
API without support :(.



Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-17 7:25 GMT+01:00 Eugene Kirpichov
<[email protected]>:

JB, not sure what you mean? SDFs and triggers are unrelated,
and
the
post
doesn't mention the word. Did you mean something else, e.g.
restriction
perhaps? Either way I don't think SDFs are the solution here;
SDFs
have

to

do with the ability to split the processing of *a single
element*
over
multiple calls, whereas Romain I think is asking for
repeatable
grouping

of

*multiple* elements.

Romain - does



https://github.com/apache/beam/blob/master/sdks/java/
core/src/main/java/org/apache/beam/sdk/transforms/
GroupIntoBatches.java

do what
you want?

On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste Onofré <
[email protected]>
wrote:

It sounds like the "Trigger" in the Splittable DoFn, no ?

https://beam.apache.org/blog/2017/08/16/splittable-do-fn.
html

Regards
JB


On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote:

it gives the fn/transform the ability to save a state - it
can
get
back on "restart" / whatever unit we can use, probably
runner
dependent? Without that you need to rewrite all IO usage
with
something like the previous pattern which makes the IO not
self
sufficient and kind of makes the entry cost and usage of
beam
way
further.

In my mind it is exactly what jbatch/spring-batch uses but
adapted
to
beam (stream in particular) case.

Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-17 6:49 GMT+01:00 Reuven Lax
<[email protected]
:

Romain,

Can you define what you mean by checkpoint? What are the
semantics,

what

does it accomplish?

Reuven

On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau <

[email protected]>

wrote:

Yes, what I propose earlier was:

I. checkpoint marker:

@AnyBeamAnnotation
@CheckpointAfter
public void someHook(SomeContext ctx);


II. pipeline.apply(ParDo.of(new
MyFn()).withCheckpointAlgorithm(new
CountingAlgo()))

III. (I like this one less)

// in the dofn
@CheckpointTester
public boolean shouldCheckpoint();

IV. @Checkpointer Serializable getCheckpoint(); in the
dofn
per

element





Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-17 6:06 GMT+01:00 Raghu Angadi
<[email protected]

:

How would you define it (rough API is fine)?. Without
more
details,

it is

not easy to see wider applicability and feasibility in
runners.

On Thu, Nov 16, 2017 at 1:13 PM, Romain Manni-Bucau <

[email protected]>

wrote:

This is a fair summary of the current state but also
where
beam

can

have a

very strong added value and make big data great and
smooth.

Instead of this replay feature isnt checkpointing
willable?
In

particular

with SDF no?


Le 16 nov. 2017 19:50, "Raghu Angadi"
<[email protected]>

a

écrit :

Core issue here is that there is no explicit concept
of

'checkpoint'

in

Beam (UnboundedSource has a method 'getCheckpointMark'
but
that

refers to

the checkoint on external source). Runners do
checkpoint

internally

as

implementation detail. Flink's checkpoint model is
entirely

different

from

Dataflow's and Spark's.

@StableReplay helps, but it does not explicitly talk
about
a

checkpoint

by

design.

If you are looking to achieve some guarantees with a
sink/DoFn, I

think

it

is better to start with the requirements. I worked on

exactly-once

sink

for

Kafka (see KafkaIO.write().withEOS()), where we
essentially

reshard

the

elements and assign sequence numbers to elements with
in
each

shard.

Duplicates in replays are avoided based on these
sequence

numbers.

DoFn

state API is used to buffer out-of order replays. The

implementation

strategy works in Dataflow but not in Flink which has
a

horizontal

checkpoint. KafkaIO checks for compatibility.

On Wed, Nov 15, 2017 at 12:38 AM, Romain Manni-Bucau <
[email protected]>
wrote:

Hi guys,

The subject is a bit provocative but the topic is
real
and

coming

again and again with the beam usage: how a dofn can
handle
some
"chunking".

The need is to be able to commit each N records but
with
N
not

too

big.


The natural API for that in beam is the bundle one
but
bundles

are

not

reliable since they can be very small (flink) - we
can
say
it
is

"ok"

even if it has some perf impacts - or too big (spark
does
full

size

/

#workers).

The workaround is what we see in the ES I/O: a
maxSize
which

does

an

eager flush. The issue is that then the checkpoint is
not

respected

and you can process multiple times the same records.

Any plan to make this API reliable and controllable
from
a
beam

point

of view (at least in a max manner)?

Thanks,
Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn





--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com




--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com






--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to