I don't think it belongs in PIpelineOptions, as bundle size is always a runner
thing.
We could consider adding a new generic RunnerOptions, however I'm not convinced
all runners can actually support this.
On Thu, Nov 30, 2017 at 6:09 AM, Romain Manni-Bucau <rmannibu...@gmail.com
<mailto:rmannibu...@gmail.com>> wrote:
Guys,
what about moving getMaxBundleSize from flink options to pipeline
options. I think all runners can support it right? Spark code needs
the merge of the v2 before being able to be implemented probably but I
don't see any blocker.
wdyt?
Romain Manni-Bucau
@rmannibucau | Blog | Old Blog | Github | LinkedIn
2017-11-19 8:19 GMT+01:00 Romain Manni-Bucau <rmannibu...@gmail.com
<mailto:rmannibu...@gmail.com>>:
> @Eugene: "workaround" as specific to the IO each time and therefore
> still highlight a lack in the core.
>
> Other comments inline
>
>
> 2017-11-19 7:40 GMT+01:00 Robert Bradshaw <rober...@google.com.invalid>:
>> There is a possible fourth issue that we don't handle well: efficiency.
For
>> very large bundles, it may be advantageous to avoid replaying a bunch of
>> idempotent operations if there were a way to record what ones we're sure
>> went through. Not sure if that's the issue here (though one could
possibly
>> do this with SDFs, one can preemptively returning periodically before an
>> element (or portion thereof) is done).
>
> +1, also lead to the IO handling its own chunking/bundles and
> therefore solves all issues at once.
>
>>
>> On Sat, Nov 18, 2017 at 6:58 PM, Eugene Kirpichov <
>> kirpic...@google.com.invalid> wrote:
>>
>>> I disagree that the usage of document id in ES is a "workaround" - it
does
>>> not address any *accidental *complexity
>>> <https://en.wikipedia.org/wiki/No_Silver_Bullet
<https://en.wikipedia.org/wiki/No_Silver_Bullet>> coming from shortcomings
>>> of Beam, it addresses the *essential* complexity that a distributed
system
>>> forces one to take it as a fact of nature that the same write
>>> (mutation) will happen multiple times, so if you want a mutation to
happen
>>> "as-if" it happened exactly once, the mutation itself must be
idempotent
>>> <https://en.wikipedia.org/wiki/Idempotence
<https://en.wikipedia.org/wiki/Idempotence>>. Insert-with-id (upsert
>>> <https://en.wikipedia.org/wiki/Merge_(SQL)
<https://en.wikipedia.org/wiki/Merge_(SQL)>>) is a classic example of an
>>> idempotent mutation, and it's very good that Elasticsearch provides it
- if
>>> it didn't, no matter how good of an API Beam had, achieving
exactly-once
>>> writes would be theoretically impossible. Are we in agreement on this
so
>>> far?
>>>
>>> Next: you seem to be discussing 3 issues together, all of which are
valid
>>> issues, but they seem unrelated to me:
>>> 1. Exactly-once mutation
>>> 2. Batching multiple mutations into one RPC.
>>> 3. Backpressure
>>>
>>> #1: was considered above. The system the IO is talking to has to
support
>>> idempotent mutations, in an IO-specific way, and the IO has to take
>>> advantage of them, in the IO-specific way - end of story.
>
> Agree but don't forget the original point was about "chunks" and not
> individual records.
>
>>>
>>> #2: a batch of idempotent operations is also idempotent, so this
doesn't
>>> add anything new semantically. Syntactically - Beam already allows you
to
>>> write your own batching by notifying you of permitted batch boundaries
>>> (Start/FinishBundle). Sure, it could do more, but from my experience
the
>>> batching in IOs I've seen is one of the easiest and least error-prone
>>> parts, so I don't see something worth an extended discussion here.
>
> "Beam already allows you to
> write your own batching by notifying you of permitted batch boundaries
> (Start/FinishBundle)"
>
> Is wrong since the bundle is potentially the whole PCollection (spark)
> so this is not even an option until you use the SDF (back to the same
> point).
> Once again the API looks fine but no implementation makes it true. It
> would be easy to change it in spark, flink can be ok since it targets
> more the streaming case, not sure of others, any idea?
>
>
>>>
>>> #3: handling backpressure is a complex problem with multiple facets:
1) how
>>> do you know you're being throttled, and by how much are you exceeding
the
>>> external system's capacity?
>
> This is the whole point of backpressure, the system sends it back to
> you (header like or status technic in general)
>
>>> 2) how do you communicate this signal to the
>>> runner?
>
> You are a client so you get the meta in the response - whatever techno.
>
>>> 3) what does the runner do in response?
>
> Runner nothing but the IO adapts its handling as mentionned before
> (wait and retry, skip, ... depending the config)
>
>>> 4) how do you wait until
>>> it's ok to try again?
>
> This is one point to probably enhance in beam but waiting in the
> processing is an option if the source has some buffering otherwise it
> requires to have a buffer fallback and max size if the wait mode is
> activated.
>
>>> You seem to be advocating for solving one facet of this problem, which
is:
>>> you want it to be possible to signal to the runner "I'm being
throttled,
>>> please end the bundle", right? If so - I think this (ending the
bundle) is
>>> unnecessary: the DoFn can simply do an exponential back-off sleep loop.
>
> Agree, never said the runner should know but GBK+output doesnt work
> cause you dont own the GBK.
>
>>> This is e.g. what DatastoreIO does
>>> <https://github.com/apache/beam/blob/master/sdks/java/io/
<https://github.com/apache/beam/blob/master/sdks/java/io/>
>>> google-cloud-platform/src/main/java/org/apache/beam/sdk/
>>> io/gcp/datastore/DatastoreV1.java#L1318>
>>> and
>>> this is in general how most systems I've seen handle backpressure. Is
there
>>> something I'm missing? In particular, is there any compelling reason
why
>>> you think it'd be beneficial e.g. for DatastoreIO to commit the
results of
>>> the bundle so far before processing other elements?
>
> It was more about ensuring you validate early a subset of the whole
> bundle and avoid to reprocess it if it fails later.
>
>
> So to summarize I see 2 outcomes:
>
> 1. impl SDF in all runners
> 2. make the bundle size upper bounded - through a pipeline option - in
> all runners, not sure this one is doable everywhere since I mainly
> checked spark case
>
>>>
>>> Again, it might be that I'm still misunderstanding what you're trying
to
>>> say. One of the things it would help to clarify would be - exactly
what do
>>> you mean by "how batch frameworks solved that for years": can you
point at
>>> an existing API in some other framework that achieves what you want?
>>>
>>> On Sat, Nov 18, 2017 at 1:02 PM Romain Manni-Bucau
<rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>>
>>> wrote:
>>>
>>> > Eugene, point - and issue with a single sample - is you can always
find
>>> > *workarounds* on a case by case basis as the id one with ES but beam
>>> doesnt
>>> > solve the problem as a framework.
>>> >
>>> > From my past, I clearly dont see how batch frameworks solved that for
>>> years
>>> > and beam is not able to do it - keep in mind it is the same kind of
>>> techno,
>>> > it just uses different sources and bigger clusters so no real reason
to
>>> not
>>> > have the same feature quality. The only potential reason i see is
there
>>> is
>>> > no tracking of the state into the cluster - e2e. But i dont see why
there
>>> > wouldnt be. Do I miss something here?
>>> >
>>> > An example could be: take a github crawler computing stats on the
whole
>>> > girhub repos which is based on a rest client as example. You will
need to
>>> > handle the rate limit and likely want to "commit" each time you
reach a
>>> > rate limit with likely some buffering strategy with a max size before
>>> > really waiting. How do you do it with a GBK independent of your
dofn? You
>>> > are not able to compose correctly the fn between them :(.
>>> >
>>> >
>>> > Le 18 nov. 2017 20:48, "Eugene Kirpichov"
<kirpic...@google.com.invalid>
>>> a
>>> > écrit :
>>> >
>>> > After giving this thread my best attempt at understanding exactly
what is
>>> > the problem and the proposed solution, I'm afraid I still fail to
>>> > understand both. To reiterate, I think the only way to make progress
here
>>> > is to be more concrete: (quote) take some IO that you think could be
>>> easier
>>> > to write with your proposed API, give the contents of a hypothetical
>>> > PCollection being written to this IO, give the code of a hypothetical
>>> DoFn
>>> > implementing the write using your API, and explain what you'd expect
to
>>> > happen at runtime. I'm going to re-engage in this thread after such
an
>>> > example is given.
>>> >
>>> > On Sat, Nov 18, 2017, 5:00 AM Romain Manni-Bucau
<rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>>
>>> > wrote:
>>> >
>>> > > First bundle retry is unusable with dome runners like spark where
the
>>> > > bundle size is the collection size / number of work. This means a
user
>>> > cant
>>> > > use bundle API or feature reliably and portably - which is beam
>>> promise.
>>> > > Aligning chunking and bundles would guarantee that bit can be not
>>> > desired,
>>> > > that is why i thought it can be another feature.
>>> > >
>>> > > GBK works until the IO knows about that and both concepts are not
>>> always
>>> > > orthogonal - backpressure like systems is a trivial common example.
>>> This
>>> > > means the IO (dofn) must be able to do it itself at some point.
>>> > >
>>> > > Also note the GBK works only if the IO can take a list which is
never
>>> the
>>> > > case today.
>>> > >
>>> > > Big questions for me are: is SDF the way to go since it provides
the
>>> > needed
>>> > > API bit is not yet supported? What about existing IO? Should beam
>>> provide
>>> > > an auto wrapping of dofn for that pre-aggregated support and
simulate
>>> > > bundles to the actual IO impl to keep the existing API?
>>> > >
>>> > >
>>> > > Le 17 nov. 2017 19:20, "Raghu Angadi" <rang...@google.com.invalid>
a
>>> > > écrit :
>>> > >
>>> > > On Fri, Nov 17, 2017 at 1:02 AM, Romain Manni-Bucau <
>>> > rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>
>>> > > >
>>> > > wrote:
>>> > >
>>> > > > Yep, just take ES IO, if a part of a bundle fails you are in an
>>> > > > unmanaged state. This is the case for all O (of IO ;)). Issue is
not
>>> > > > much about "1" (the code it takes) but more the fact it doesn't
>>> > > > integrate with runner features and retries potentially: what
happens
>>> > > > if a bundle has a failure? => undefined today. 2. I'm fine with
it
>>> > > > while we know exactly what happens when we restart after a bundle
>>> > > > failure. With ES the timestamp can be used for instance.
>>> > > >
>>> > >
>>> > > This deterministic batching can be achieved even now with an extra
>>> > > GroupByKey (and if you want ordering on top of that, will need
another
>>> > > GBK). Don't know if that is too costly in your case. I would need
bit
>>> > more
>>> > > details on handling ES IO write retries to see it could be
simplified.
>>> > Note
>>> > > that retries occur with or without any failures in your DoFn.
>>> > >
>>> > > The biggest negative with GBK approach is that it doesn't provide
same
>>> > > guarantees on Flink.
>>> > >
>>> > > I don't see how GroubIntoBatches in Beam provides specific
guarantees
>>> on
>>> > > deterministic batches.
>>> > >
>>> > > Thinking about it the SDF is really a way to do it since the SDF
will
>>> > > > manage the bulking and associated with the runner "retry" it
seems it
>>> > > > covers the needs.
>>> > > >
>>> > > > Romain Manni-Bucau
>>> > > > @rmannibucau | Blog | Old Blog | Github | LinkedIn
>>> > > >
>>> > > >
>>> > > > 2017-11-17 9:23 GMT+01:00 Eugene Kirpichov
>>> > <kirpic...@google.com.invalid
>>> > > >:
>>> > > > > I must admit I'm still failing to understand the problem, so
let's
>>> > step
>>> > > > > back even further.
>>> > > > >
>>> > > > > Could you give an example of an IO that is currently difficult
to
>>> > > > implement
>>> > > > > specifically because of lack of the feature you're talking
about?
>>> > > > >
>>> > > > > I'm asking because I've reviewed almost all Beam IOs and don't
>>> recall
>>> > > > > seeing a similar problem. Sure, a lot of IOs do batching
within a
>>> > > bundle,
>>> > > > > but 1) it doesn't take up much code (granted, it would be even
>>> easier
>>> > > if
>>> > > > > Beam did it for us) and 2) I don't remember any of them
requiring
>>> the
>>> > > > > batches to be deterministic, and I'm having a hard time
imagining
>>> > what
>>> > > > kind
>>> > > > > of storage system would be able to deduplicate if batches were
>>> > > > > deterministic but wouldn't be able to deduplicate if they
weren't.
>>> > > > >
>>> > > > > On Thu, Nov 16, 2017 at 11:50 PM Romain Manni-Bucau <
>>> > > > rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>>
>>> > > > > wrote:
>>> > > > >
>>> > > > >> Ok, let me try to step back and summarize what we have today
and
>>> > what
>>> > > I
>>> > > > >> miss:
>>> > > > >>
>>> > > > >> 1. we can handle chunking in beam through group in batch (or
>>> > > equivalent)
>>> > > > >> but:
>>> > > > >> > it is not built-in into the transforms (IO) and it is
>>> > controlled
>>> > > > >> from outside the transforms so no way for a transform to do it
>>> > > > >> properly without handling itself a composition and links
between
>>> > > > >> multiple dofns to have notifications and potentially react
>>> properly
>>> > or
>>> > > > >> handle backpressure from its backend
>>> > > > >> 2. there is no restart feature because there is no real state
>>> > handling
>>> > > > >> at the moment. this sounds fully delegated to the runner but
I was
>>> > > > >> hoping to have more guarantees from the used API to be able to
>>> > restart
>>> > > > >> a pipeline (mainly batch since it can be irrelevant or
delegates
>>> to
>>> > > > >> the backend for streams) and handle only not commited records
so
>>> it
>>> > > > >> requires some persistence outside the main IO storages to do
it
>>> > > > >> properly
>>> > > > >> > note this is somehow similar to the monitoring topic
which
>>> miss
>>> > > > >> persistence ATM so it can end up to beam to have a pluggable
>>> storage
>>> > > > >> for a few concerns
>>> > > > >>
>>> > > > >>
>>> > > > >> Short term I would be happy with 1 solved properly, long term
I
>>> hope
>>> > 2
>>> > > > >> will be tackled without workarounds requiring custom wrapping
of
>>> IO
>>> > to
>>> > > > >> use a custom state persistence.
>>> > > > >>
>>> > > > >>
>>> > > > >>
>>> > > > >> Romain Manni-Bucau
>>> > > > >> @rmannibucau | Blog | Old Blog | Github | LinkedIn
>>> > > > >>
>>> > > > >>
>>> > > > >> 2017-11-17 7:44 GMT+01:00 Jean-Baptiste Onofré
<j...@nanthrax.net <mailto:j...@nanthrax.net>>:
>>> > > > >> > Thanks for the explanation. Agree, we might talk about
different
>>> > > > things
>>> > > > >> > using the same wording.
>>> > > > >> >
>>> > > > >> > I'm also struggling to understand the use case (for a
generic
>>> > DoFn).
>>> > > > >> >
>>> > > > >> > Regards
>>> > > > >> > JB
>>> > > > >> >
>>> > > > >> >
>>> > > > >> > On 11/17/2017 07:40 AM, Eugene Kirpichov wrote:
>>> > > > >> >>
>>> > > > >> >> To avoid spending a lot of time pursuing a false path, I'd
like
>>> > to
>>> > > > say
>>> > > > >> >> straight up that SDF is definitely not going to help here,
>>> > despite
>>> > > > the
>>> > > > >> >> fact
>>> > > > >> >> that its API includes the term "checkpoint". In SDF, the
>>> > > "checkpoint"
>>> > > > >> >> captures the state of processing within a single element.
If
>>> > you're
>>> > > > >> >> applying an SDF to 1000 elements, it will, like any other
DoFn,
>>> > be
>>> > > > >> applied
>>> > > > >> >> to each of them independently and in parallel, and you'll
have
>>> > 1000
>>> > > > >> >> checkpoints capturing the state of processing each of these
>>> > > elements,
>>> > > > >> >> which
>>> > > > >> >> is probably not what you want.
>>> > > > >> >>
>>> > > > >> >> I'm afraid I still don't understand what kind of
checkpoint you
>>> > > > need, if
>>> > > > >> >> it
>>> > > > >> >> is not just deterministic grouping into batches.
"Checkpoint"
>>> is
>>> > a
>>> > > > very
>>> > > > >> >> broad term and it's very possible that everybody in this
thread
>>> > is
>>> > > > >> talking
>>> > > > >> >> about different things when saying it. So it would help if
you
>>> > > could
>>> > > > >> give
>>> > > > >> >> a
>>> > > > >> >> more concrete example: for example, take some IO that you
think
>>> > > > could be
>>> > > > >> >> easier to write with your proposed API, give the contents
of a
>>> > > > >> >> hypothetical
>>> > > > >> >> PCollection being written to this IO, give the code of a
>>> > > hypothetical
>>> > > > >> DoFn
>>> > > > >> >> implementing the write using your API, and explain what
you'd
>>> > > expect
>>> > > > to
>>> > > > >> >> happen at runtime.
>>> > > > >> >>
>>> > > > >> >> On Thu, Nov 16, 2017 at 10:33 PM Romain Manni-Bucau
>>> > > > >> >> <rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>>
>>> > > > >> >> wrote:
>>> > > > >> >>
>>> > > > >> >>> @Eugene: yes and the other alternative of Reuven too but
it is
>>> > > still
>>> > > > >> >>> 1. relying on timers, 2. not really checkpointed
>>> > > > >> >>>
>>> > > > >> >>> In other words it seems all solutions are to create a
chunk of
>>> > > size
>>> > > > 1
>>> > > > >> >>> and replayable to fake the lack of chunking in the
framework.
>>> > This
>>> > > > >> >>> always implies a chunk handling outside the component
>>> (typically
>>> > > > >> >>> before for an output). My point is I think IO need it in
their
>>> > own
>>> > > > >> >>> "internal" or at least control it themselves since the
chunk
>>> > size
>>> > > is
>>> > > > >> >>> part of the IO handling most of the time.
>>> > > > >> >>>
>>> > > > >> >>> I think JB spoke of the same "group before" trick using
>>> > > restrictions
>>> > > > >> >>> which can work I have to admit if SDF are implemented by
>>> > runners.
>>> > > Is
>>> > > > >> >>> there a roadmap/status on that? Last time I checked SDF
was a
>>> > > great
>>> > > > >> >>> API without support :(.
>>> > > > >> >>>
>>> > > > >> >>>
>>> > > > >> >>>
>>> > > > >> >>> Romain Manni-Bucau
>>> > > > >> >>> @rmannibucau | Blog | Old Blog | Github | LinkedIn
>>> > > > >> >>>
>>> > > > >> >>>
>>> > > > >> >>> 2017-11-17 7:25 GMT+01:00 Eugene Kirpichov
>>> > > > >> >>> <kirpic...@google.com.invalid>:
>>> > > > >> >>>>
>>> > > > >> >>>> JB, not sure what you mean? SDFs and triggers are
unrelated,
>>> > and
>>> > > > the
>>> > > > >> >>>> post
>>> > > > >> >>>> doesn't mention the word. Did you mean something else,
e.g.
>>> > > > >> restriction
>>> > > > >> >>>> perhaps? Either way I don't think SDFs are the solution
here;
>>> > > SDFs
>>> > > > >> have
>>> > > > >> >>>
>>> > > > >> >>> to
>>> > > > >> >>>>
>>> > > > >> >>>> do with the ability to split the processing of *a single
>>> > element*
>>> > > > over
>>> > > > >> >>>> multiple calls, whereas Romain I think is asking for
>>> repeatable
>>> > > > >> grouping
>>> > > > >> >>>
>>> > > > >> >>> of
>>> > > > >> >>>>
>>> > > > >> >>>> *multiple* elements.
>>> > > > >> >>>>
>>> > > > >> >>>> Romain - does
>>> > > > >> >>>>
>>> > > > >> >>>
>>> > > > >> >>>
>>> > > > >> https://github.com/apache/beam/blob/master/sdks/java/
<https://github.com/apache/beam/blob/master/sdks/java/>
>>> > > > core/src/main/java/org/apache/beam/sdk/transforms/
>>> GroupIntoBatches.java
>>> > > > >> >>>>
>>> > > > >> >>>> do what
>>> > > > >> >>>> you want?
>>> > > > >> >>>>
>>> > > > >> >>>> On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste Onofré <
>>> > > > >> j...@nanthrax.net <mailto:j...@nanthrax.net>>
>>> > > > >> >>>> wrote:
>>> > > > >> >>>>
>>> > > > >> >>>>> It sounds like the "Trigger" in the Splittable DoFn, no
?
>>> > > > >> >>>>>
>>> > > > >> >>>>> https://beam.apache.org/blog/2017/08/16/splittable-do-fn
<https://beam.apache.org/blog/2017/08/16/splittable-do-fn>.
>>> html
>>> > > > >> >>>>>
>>> > > > >> >>>>> Regards
>>> > > > >> >>>>> JB
>>> > > > >> >>>>>
>>> > > > >> >>>>>
>>> > > > >> >>>>> On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote:
>>> > > > >> >>>>>>
>>> > > > >> >>>>>> it gives the fn/transform the ability to save a state
- it
>>> > can
>>> > > > get
>>> > > > >> >>>>>> back on "restart" / whatever unit we can use, probably
>>> runner
>>> > > > >> >>>>>> dependent? Without that you need to rewrite all IO
usage
>>> with
>>> > > > >> >>>>>> something like the previous pattern which makes the IO
not
>>> > self
>>> > > > >> >>>>>> sufficient and kind of makes the entry cost and usage
of
>>> beam
>>> > > way
>>> > > > >> >>>>>> further.
>>> > > > >> >>>>>>
>>> > > > >> >>>>>> In my mind it is exactly what jbatch/spring-batch uses
but
>>> > > > adapted
>>> > > > >> to
>>> > > > >> >>>>>> beam (stream in particular) case.
>>> > > > >> >>>>>>
>>> > > > >> >>>>>> Romain Manni-Bucau
>>> > > > >> >>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn
>>> > > > >> >>>>>>
>>> > > > >> >>>>>>
>>> > > > >> >>>>>> 2017-11-17 6:49 GMT+01:00 Reuven Lax
>>> > <re...@google.com.invalid
>>> > > >:
>>> > > > >> >>>>>>>
>>> > > > >> >>>>>>> Romain,
>>> > > > >> >>>>>>>
>>> > > > >> >>>>>>> Can you define what you mean by checkpoint? What are
the
>>> > > > semantics,
>>> > > > >> >>>
>>> > > > >> >>> what
>>> > > > >> >>>>>>>
>>> > > > >> >>>>>>> does it accomplish?
>>> > > > >> >>>>>>>
>>> > > > >> >>>>>>> Reuven
>>> > > > >> >>>>>>>
>>> > > > >> >>>>>>> On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau <
>>> > > > >> >>>>>
>>> > > > >> >>>>> rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>>
>>> > > > >> >>>>>>>
>>> > > > >> >>>>>>> wrote:
>>> > > > >> >>>>>>>
>>> > > > >> >>>>>>>> Yes, what I propose earlier was:
>>> > > > >> >>>>>>>>
>>> > > > >> >>>>>>>> I. checkpoint marker:
>>> > > > >> >>>>>>>>
>>> > > > >> >>>>>>>> @AnyBeamAnnotation
>>> > > > >> >>>>>>>> @CheckpointAfter
>>> > > > >> >>>>>>>> public void someHook(SomeContext ctx);
>>> > > > >> >>>>>>>>
>>> > > > >> >>>>>>>>
>>> > > > >> >>>>>>>> II. pipeline.apply(ParDo.of(new
>>> > > > >> MyFn()).withCheckpointAlgorithm(new
>>> > > > >> >>>>>>>> CountingAlgo()))
>>> > > > >> >>>>>>>>
>>> > > > >> >>>>>>>> III. (I like this one less)
>>> > > > >> >>>>>>>>
>>> > > > >> >>>>>>>> // in the dofn
>>> > > > >> >>>>>>>> @CheckpointTester
>>> > > > >> >>>>>>>> public boolean shouldCheckpoint();
>>> > > > >> >>>>>>>>
>>> > > > >> >>>>>>>> IV. @Checkpointer Serializable getCheckpoint(); in
the
>>> dofn
>>> > > per
>>> > > > >> >>>
>>> > > > >> >>> element
>>> > > > >> >>>>>>>>
>>> > > > >> >>>>>>>>
>>> > > > >> >>>>>>>>
>>> > > > >> >>>>>>>>
>>> > > > >> >>>>>>>>
>>> > > > >> >>>>>>>> Romain Manni-Bucau
>>> > > > >> >>>>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn
>>> > > > >> >>>>>>>>
>>> > > > >> >>>>>>>>
>>> > > > >> >>>>>>>> 2017-11-17 6:06 GMT+01:00 Raghu Angadi
>>> > > > <rang...@google.com.invalid
>>> > > > >> >>>>
>>> > > > >> >>>> :
>>> > > > >> >>>>>>>>>
>>> > > > >> >>>>>>>>> How would you define it (rough API is fine)?.
Without
>>> more
>>> > > > >> details,
>>> > > > >> >>>>>
>>> > > > >> >>>>> it is
>>> > > > >> >>>>>>>>>
>>> > > > >> >>>>>>>>> not easy to see wider applicability and feasibility
in
>>> > > > runners.
>>> > > > >> >>>>>>>>>
>>> > > > >> >>>>>>>>> On Thu, Nov 16, 2017 at 1:13 PM, Romain Manni-Bucau
<
>>> > > > >> >>>>>>>>
>>> > > > >> >>>>>>>> rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>>
>>> > > > >> >>>>>>>>>
>>> > > > >> >>>>>>>>> wrote:
>>> > > > >> >>>>>>>>>
>>> > > > >> >>>>>>>>>> This is a fair summary of the current state but
also
>>> > where
>>> > > > beam
>>> > > > >> >>>
>>> > > > >> >>> can
>>> > > > >> >>>>>>>>
>>> > > > >> >>>>>>>> have a
>>> > > > >> >>>>>>>>>>
>>> > > > >> >>>>>>>>>> very strong added value and make big data great and
>>> > smooth.
>>> > > > >> >>>>>>>>>>
>>> > > > >> >>>>>>>>>> Instead of this replay feature isnt checkpointing
>>> > willable?
>>> > > > In
>>> > > > >> >>>>>>>>
>>> > > > >> >>>>>>>> particular
>>> > > > >> >>>>>>>>>>
>>> > > > >> >>>>>>>>>> with SDF no?
>>> > > > >> >>>>>>>>>>
>>> > > > >> >>>>>>>>>>
>>> > > > >> >>>>>>>>>> Le 16 nov. 2017 19:50, "Raghu Angadi"
>>> > > > >> <rang...@google.com.invalid>
>>> > > > >> >>>
>>> > > > >> >>> a
>>> > > > >> >>>>>>>>>>
>>> > > > >> >>>>>>>>>> écrit :
>>> > > > >> >>>>>>>>>>
>>> > > > >> >>>>>>>>>>> Core issue here is that there is no explicit
concept
>>> of
>>> > > > >> >>>
>>> > > > >> >>> 'checkpoint'
>>> > > > >> >>>>>>>>
>>> > > > >> >>>>>>>> in
>>> > > > >> >>>>>>>>>>>
>>> > > > >> >>>>>>>>>>> Beam (UnboundedSource has a method
'getCheckpointMark'
>>> > but
>>> > > > that
>>> > > > >> >>>>>>>>
>>> > > > >> >>>>>>>> refers to
>>> > > > >> >>>>>>>>>>>
>>> > > > >> >>>>>>>>>>> the checkoint on external source). Runners do
>>> checkpoint
>>> > > > >> >>>
>>> > > > >> >>> internally
>>> > > > >> >>>>>
>>> > > > >> >>>>> as
>>> > > > >> >>>>>>>>>>>
>>> > > > >> >>>>>>>>>>> implementation detail. Flink's checkpoint model is
>>> > > entirely
>>> > > > >> >>>>>
>>> > > > >> >>>>> different
>>> > > > >> >>>>>>>>>>
>>> > > > >> >>>>>>>>>> from
>>> > > > >> >>>>>>>>>>>
>>> > > > >> >>>>>>>>>>> Dataflow's and Spark's.
>>> > > > >> >>>>>>>>>>>
>>> > > > >> >>>>>>>>>>> @StableReplay helps, but it does not explicitly
talk
>>> > about
>>> > > a
>>> > > > >> >>>>>>>>
>>> > > > >> >>>>>>>> checkpoint
>>> > > > >> >>>>>>>>>>
>>> > > > >> >>>>>>>>>> by
>>> > > > >> >>>>>>>>>>>
>>> > > > >> >>>>>>>>>>> design.
>>> > > > >> >>>>>>>>>>>
>>> > > > >> >>>>>>>>>>> If you are looking to achieve some guarantees
with a
>>> > > > >> sink/DoFn, I
>>> > > > >> >>>>>>>>
>>> > > > >> >>>>>>>> think
>>> > > > >> >>>>>>>>>>
>>> > > > >> >>>>>>>>>> it
>>> > > > >> >>>>>>>>>>>
>>> > > > >> >>>>>>>>>>> is better to start with the requirements. I
worked on
>>> > > > >> >>>
>>> > > > >> >>> exactly-once
>>> > > > >> >>>>>>>>
>>> > > > >> >>>>>>>> sink
>>> > > > >> >>>>>>>>>>
>>> > > > >> >>>>>>>>>> for
>>> > > > >> >>>>>>>>>>>
>>> > > > >> >>>>>>>>>>> Kafka (see KafkaIO.write().withEOS()), where we
>>> > > essentially
>>> > > > >> >>>
>>> > > > >> >>> reshard
>>> > > > >> >>>>>>>>
>>> > > > >> >>>>>>>> the
>>> > > > >> >>>>>>>>>>>
>>> > > > >> >>>>>>>>>>> elements and assign sequence numbers to elements
with
>>> in
>>> > > > each
>>> > > > >> >>>
>>> > > > >> >>> shard.
>>> > > > >> >>>>>>>>>>>
>>> > > > >> >>>>>>>>>>> Duplicates in replays are avoided based on these
>>> > sequence
>>> > > > >> >>>
>>> > > > >> >>> numbers.
>>> > > > >> >>>>>>>>
>>> > > > >> >>>>>>>> DoFn
>>> > > > >> >>>>>>>>>>>
>>> > > > >> >>>>>>>>>>> state API is used to buffer out-of order replays.
The
>>> > > > >> >>>
>>> > > > >> >>> implementation
>>> > > > >> >>>>>>>>>>>
>>> > > > >> >>>>>>>>>>> strategy works in Dataflow but not in Flink which
has
>>> a
>>> > > > >> >>>
>>> > > > >> >>> horizontal
>>> > > > >> >>>>>>>>>>>
>>> > > > >> >>>>>>>>>>> checkpoint. KafkaIO checks for compatibility.
>>> > > > >> >>>>>>>>>>>
>>> > > > >> >>>>>>>>>>> On Wed, Nov 15, 2017 at 12:38 AM, Romain Manni-Bucau
<
>>> > > > >> >>>>>>>>>>> rmannibu...@gmail.com
<mailto:rmannibu...@gmail.com>>
>>> > > > >> >>>>>>>>>>> wrote:
>>> > > > >> >>>>>>>>>>>
>>> > > > >> >>>>>>>>>>>> Hi guys,
>>> > > > >> >>>>>>>>>>>>
>>> > > > >> >>>>>>>>>>>> The subject is a bit provocative but the topic is
>>> real
>>> > > and
>>> > > > >> >>>
>>> > > > >> >>> coming
>>> > > > >> >>>>>>>>>>>>
>>> > > > >> >>>>>>>>>>>> again and again with the beam usage: how a dofn
can
>>> > > handle
>>> > > > >> some
>>> > > > >> >>>>>>>>>>>> "chunking".
>>> > > > >> >>>>>>>>>>>>
>>> > > > >> >>>>>>>>>>>> The need is to be able to commit each N records
but
>>> > with
>>> > > N
>>> > > > not
>>> > > > >> >>>
>>> > > > >> >>> too
>>> > > > >> >>>>>>>>
>>> > > > >> >>>>>>>> big.
>>> > > > >> >>>>>>>>>>>>
>>> > > > >> >>>>>>>>>>>>
>>> > > > >> >>>>>>>>>>>> The natural API for that in beam is the bundle
one
>>> but
>>> > > > bundles
>>> > > > >> >>>
>>> > > > >> >>> are
>>> > > > >> >>>>>>>>
>>> > > > >> >>>>>>>> not
>>> > > > >> >>>>>>>>>>>>
>>> > > > >> >>>>>>>>>>>> reliable since they can be very small (flink) -
we
>>> can
>>> > > say
>>> > > > it
>>> > > > >> is
>>> > > > >> >>>>>>>>
>>> > > > >> >>>>>>>> "ok"
>>> > > > >> >>>>>>>>>>>>
>>> > > > >> >>>>>>>>>>>> even if it has some perf impacts - or too big
(spark
>>> > does
>>> > > > full
>>> > > > >> >>>
>>> > > > >> >>> size
>>> > > > >> >>>>>>>>
>>> > > > >> >>>>>>>> /
>>> > > > >> >>>>>>>>>>>>
>>> > > > >> >>>>>>>>>>>> #workers).
>>> > > > >> >>>>>>>>>>>>
>>> > > > >> >>>>>>>>>>>> The workaround is what we see in the ES I/O: a
>>> maxSize
>>> > > > which
>>> > > > >> >>>
>>> > > > >> >>> does
>>> > > > >> >>>>>
>>> > > > >> >>>>> an
>>> > > > >> >>>>>>>>>>>>
>>> > > > >> >>>>>>>>>>>> eager flush. The issue is that then the
checkpoint is
>>> > not
>>> > > > >> >>>
>>> > > > >> >>> respected
>>> > > > >> >>>>>>>>>>>>
>>> > > > >> >>>>>>>>>>>> and you can process multiple times the same
records.
>>> > > > >> >>>>>>>>>>>>
>>> > > > >> >>>>>>>>>>>> Any plan to make this API reliable and
controllable
>>> > from
>>> > > a
>>> > > > >> beam
>>> > > > >> >>>>>>>>
>>> > > > >> >>>>>>>> point
>>> > > > >> >>>>>>>>>>>>
>>> > > > >> >>>>>>>>>>>> of view (at least in a max manner)?
>>> > > > >> >>>>>>>>>>>>
>>> > > > >> >>>>>>>>>>>> Thanks,
>>> > > > >> >>>>>>>>>>>> Romain Manni-Bucau
>>> > > > >> >>>>>>>>>>>> @rmannibucau | Blog | Old Blog | Github |
LinkedIn
>>> > > > >> >>>>>>>>>>>>
>>> > > > >> >>>>>>>>>>>
>>> > > > >> >>>>>>>>>>
>>> > > > >> >>>>>>>>
>>> > > > >> >>>>>
>>> > > > >> >>>>> --
>>> > > > >> >>>>> Jean-Baptiste Onofré
>>> > > > >> >>>>> jbono...@apache.org <mailto:jbono...@apache.org>
>>> > > > >> >>>>> http://blog.nanthrax.net
>>> > > > >> >>>>> Talend - http://www.talend.com
>>> > > > >> >>>>>
>>> > > > >> >>>
>>> > > > >> >>
>>> > > > >> >
>>> > > > >> > --
>>> > > > >> > Jean-Baptiste Onofré
>>> > > > >> > jbono...@apache.org <mailto:jbono...@apache.org>
>>> > > > >> > http://blog.nanthrax.net
>>> > > > >> > Talend - http://www.talend.com
>>> > > > >>
>>> > > >
>>> > >
>>> >
>>>