I don't think it belongs in PIpelineOptions, as bundle size is always a
runner thing.

We could consider adding a new generic RunnerOptions, however I'm not
convinced all runners can actually support this.

On Thu, Nov 30, 2017 at 6:09 AM, Romain Manni-Bucau <rmannibu...@gmail.com>
wrote:

> Guys,
>
> what about moving getMaxBundleSize from flink options to pipeline
> options. I think all runners can support it right? Spark code needs
> the merge of the v2 before being able to be implemented probably but I
> don't see any blocker.
>
> wdyt?
>
> Romain Manni-Bucau
> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>
>
> 2017-11-19 8:19 GMT+01:00 Romain Manni-Bucau <rmannibu...@gmail.com>:
> > @Eugene: "workaround" as specific to the IO each time and therefore
> > still highlight a lack in the core.
> >
> > Other comments inline
> >
> >
> > 2017-11-19 7:40 GMT+01:00 Robert Bradshaw <rober...@google.com.invalid>:
> >> There is a possible fourth issue that we don't handle well: efficiency.
> For
> >> very large bundles, it may be advantageous to avoid replaying a bunch of
> >> idempotent operations if there were a way to record what ones we're sure
> >> went through. Not sure if that's the issue here (though one could
> possibly
> >> do this with SDFs, one can preemptively returning periodically before an
> >> element (or portion thereof) is done).
> >
> > +1, also lead to the IO handling its own chunking/bundles and
> > therefore solves all issues at once.
> >
> >>
> >> On Sat, Nov 18, 2017 at 6:58 PM, Eugene Kirpichov <
> >> kirpic...@google.com.invalid> wrote:
> >>
> >>> I disagree that the usage of document id in ES is a "workaround" - it
> does
> >>> not address any *accidental *complexity
> >>> <https://en.wikipedia.org/wiki/No_Silver_Bullet> coming from
> shortcomings
> >>> of Beam, it addresses the *essential* complexity that a distributed
> system
> >>> forces one to take it as a fact of nature that the same write
> >>> (mutation) will happen multiple times, so if you want a mutation to
> happen
> >>> "as-if" it happened exactly once, the mutation itself must be
> idempotent
> >>> <https://en.wikipedia.org/wiki/Idempotence>. Insert-with-id (upsert
> >>> <https://en.wikipedia.org/wiki/Merge_(SQL)>) is a classic example of
> an
> >>> idempotent mutation, and it's very good that Elasticsearch provides it
> - if
> >>> it didn't, no matter how good of an API Beam had, achieving
> exactly-once
> >>> writes would be theoretically impossible. Are we in agreement on this
> so
> >>> far?
> >>>
> >>> Next: you seem to be discussing 3 issues together, all of which are
> valid
> >>> issues, but they seem unrelated to me:
> >>> 1. Exactly-once mutation
> >>> 2. Batching multiple mutations into one RPC.
> >>> 3. Backpressure
> >>>
> >>> #1: was considered above. The system the IO is talking to has to
> support
> >>> idempotent mutations, in an IO-specific way, and the IO has to take
> >>> advantage of them, in the IO-specific way - end of story.
> >
> > Agree but don't forget the original point was about "chunks" and not
> > individual records.
> >
> >>>
> >>> #2: a batch of idempotent operations is also idempotent, so this
> doesn't
> >>> add anything new semantically. Syntactically - Beam already allows you
> to
> >>> write your own batching by notifying you of permitted batch boundaries
> >>> (Start/FinishBundle). Sure, it could do more, but from my experience
> the
> >>> batching in IOs I've seen is one of the easiest and least error-prone
> >>> parts, so I don't see something worth an extended discussion here.
> >
> > "Beam already allows you to
> >  write your own batching by notifying you of permitted batch boundaries
> >  (Start/FinishBundle)"
> >
> > Is wrong since the bundle is potentially the whole PCollection (spark)
> > so this is not even an option until you use the SDF (back to the same
> > point).
> > Once again the API looks fine but no implementation makes it true. It
> > would be easy to change it in spark, flink can be ok since it targets
> > more the streaming case, not sure of others, any idea?
> >
> >
> >>>
> >>> #3: handling backpressure is a complex problem with multiple facets:
> 1) how
> >>> do you know you're being throttled, and by how much are you exceeding
> the
> >>> external system's capacity?
> >
> > This is the whole point of backpressure, the system sends it back to
> > you (header like or status technic in general)
> >
> >>> 2) how do you communicate this signal to the
> >>> runner?
> >
> > You are a client so you get the meta in the response - whatever techno.
> >
> >>> 3) what does the runner do in response?
> >
> > Runner nothing but the IO adapts its handling as mentionned before
> > (wait and retry, skip, ... depending the config)
> >
> >>> 4) how do you wait until
> >>> it's ok to try again?
> >
> > This is one point to probably enhance in beam but waiting in the
> > processing is an option if the source has some buffering otherwise it
> > requires to have a buffer fallback and max size if the wait mode is
> > activated.
> >
> >>> You seem to be advocating for solving one facet of this problem, which
> is:
> >>> you want it to be possible to signal to the runner "I'm being
> throttled,
> >>> please end the bundle", right? If so - I think this (ending the
> bundle) is
> >>> unnecessary: the DoFn can simply do an exponential back-off sleep loop.
> >
> > Agree, never said the runner should know but GBK+output doesnt work
> > cause you dont own the GBK.
> >
> >>> This is e.g. what DatastoreIO does
> >>> <https://github.com/apache/beam/blob/master/sdks/java/io/
> >>> google-cloud-platform/src/main/java/org/apache/beam/sdk/
> >>> io/gcp/datastore/DatastoreV1.java#L1318>
> >>> and
> >>> this is in general how most systems I've seen handle backpressure. Is
> there
> >>> something I'm missing? In particular, is there any compelling reason
> why
> >>> you think it'd be beneficial e.g. for DatastoreIO to commit the
> results of
> >>> the bundle so far before processing other elements?
> >
> > It was more about ensuring you validate early a subset of the whole
> > bundle and avoid to reprocess it if it fails later.
> >
> >
> > So to summarize I see 2 outcomes:
> >
> > 1. impl SDF in all runners
> > 2. make the bundle size upper bounded - through a pipeline option - in
> > all runners, not sure this one is doable everywhere since I mainly
> > checked spark case
> >
> >>>
> >>> Again, it might be that I'm still misunderstanding what you're trying
> to
> >>> say. One of the things it would help to clarify would be - exactly
> what do
> >>> you mean by "how batch frameworks solved that for years": can you
> point at
> >>> an existing API in some other framework that achieves what you want?
> >>>
> >>> On Sat, Nov 18, 2017 at 1:02 PM Romain Manni-Bucau <
> rmannibu...@gmail.com>
> >>> wrote:
> >>>
> >>> > Eugene, point - and issue with a single sample - is you can always
> find
> >>> > *workarounds* on a case by case basis as the id one with ES but beam
> >>> doesnt
> >>> > solve the problem as a framework.
> >>> >
> >>> > From my past, I clearly dont see how batch frameworks solved that for
> >>> years
> >>> > and beam is not able to do it - keep in mind it is the same kind of
> >>> techno,
> >>> > it just uses different sources and bigger clusters so no real reason
> to
> >>> not
> >>> > have the same feature quality. The only potential reason i see is
> there
> >>> is
> >>> > no tracking of the state into the cluster - e2e. But i dont see why
> there
> >>> > wouldnt be. Do I miss something here?
> >>> >
> >>> > An example could be: take a github crawler computing stats on the
> whole
> >>> > girhub repos which is based on a rest client as example. You will
> need to
> >>> > handle the rate limit and likely want to "commit" each time you
> reach a
> >>> > rate limit with likely some buffering strategy with a max size before
> >>> > really waiting. How do you do it with a GBK independent of your
> dofn? You
> >>> > are not able to compose correctly the fn between them :(.
> >>> >
> >>> >
> >>> > Le 18 nov. 2017 20:48, "Eugene Kirpichov"
> <kirpic...@google.com.invalid>
> >>> a
> >>> > écrit :
> >>> >
> >>> > After giving this thread my best attempt at understanding exactly
> what is
> >>> > the problem and the proposed solution, I'm afraid I still fail to
> >>> > understand both. To reiterate, I think the only way to make progress
> here
> >>> > is to be more concrete: (quote) take some IO that you think could be
> >>> easier
> >>> > to write with your proposed API, give the contents of a hypothetical
> >>> > PCollection being written to this IO, give the code of a hypothetical
> >>> DoFn
> >>> > implementing the write using your API, and explain what you'd expect
> to
> >>> > happen at runtime. I'm going to re-engage in this thread after such
> an
> >>> > example is given.
> >>> >
> >>> > On Sat, Nov 18, 2017, 5:00 AM Romain Manni-Bucau <
> rmannibu...@gmail.com>
> >>> > wrote:
> >>> >
> >>> > > First bundle retry is unusable with dome runners like spark where
> the
> >>> > > bundle size is the collection size / number of work. This means a
> user
> >>> > cant
> >>> > > use bundle API or feature reliably and portably - which is beam
> >>> promise.
> >>> > > Aligning chunking and bundles would guarantee that bit can be not
> >>> > desired,
> >>> > > that is why i thought it can be another feature.
> >>> > >
> >>> > > GBK works until the IO knows about that and both concepts are not
> >>> always
> >>> > > orthogonal - backpressure like systems is a trivial common example.
> >>> This
> >>> > > means the IO (dofn) must be able to do it itself at some point.
> >>> > >
> >>> > > Also note the GBK works only if the IO can take a list which is
> never
> >>> the
> >>> > > case today.
> >>> > >
> >>> > > Big questions for me are: is SDF the way to go since it provides
> the
> >>> > needed
> >>> > > API bit is not yet supported? What about existing IO? Should beam
> >>> provide
> >>> > > an auto wrapping of dofn for that pre-aggregated support and
> simulate
> >>> > > bundles to the actual IO impl to keep the existing API?
> >>> > >
> >>> > >
> >>> > > Le 17 nov. 2017 19:20, "Raghu Angadi" <rang...@google.com.invalid>
> a
> >>> > > écrit :
> >>> > >
> >>> > > On Fri, Nov 17, 2017 at 1:02 AM, Romain Manni-Bucau <
> >>> > rmannibu...@gmail.com
> >>> > > >
> >>> > > wrote:
> >>> > >
> >>> > > > Yep, just take ES IO, if a part of a bundle fails you are in an
> >>> > > > unmanaged state. This is the case for all O (of IO ;)). Issue is
> not
> >>> > > > much about "1" (the code it takes) but more the fact it doesn't
> >>> > > > integrate with runner features and retries potentially: what
> happens
> >>> > > > if a bundle has a failure? => undefined today. 2. I'm fine with
> it
> >>> > > > while we know exactly what happens when we restart after a bundle
> >>> > > > failure. With ES the timestamp can be used for instance.
> >>> > > >
> >>> > >
> >>> > > This deterministic batching can be achieved even now with an extra
> >>> > > GroupByKey (and if you want ordering on top of that, will need
> another
> >>> > > GBK). Don't know if that is too costly in your case. I would need
> bit
> >>> > more
> >>> > > details on handling ES IO write retries to see it could be
> simplified.
> >>> > Note
> >>> > > that retries occur with or without any failures in your DoFn.
> >>> > >
> >>> > > The biggest negative with GBK approach is that it doesn't provide
> same
> >>> > > guarantees on Flink.
> >>> > >
> >>> > > I don't see how GroubIntoBatches in Beam provides specific
> guarantees
> >>> on
> >>> > > deterministic batches.
> >>> > >
> >>> > > Thinking about it the SDF is really a way to do it since the SDF
> will
> >>> > > > manage the bulking and associated with the runner "retry" it
> seems it
> >>> > > > covers the needs.
> >>> > > >
> >>> > > > Romain Manni-Bucau
> >>> > > > @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >>> > > >
> >>> > > >
> >>> > > > 2017-11-17 9:23 GMT+01:00 Eugene Kirpichov
> >>> > <kirpic...@google.com.invalid
> >>> > > >:
> >>> > > > > I must admit I'm still failing to understand the problem, so
> let's
> >>> > step
> >>> > > > > back even further.
> >>> > > > >
> >>> > > > > Could you give an example of an IO that is currently difficult
> to
> >>> > > > implement
> >>> > > > > specifically because of lack of the feature you're talking
> about?
> >>> > > > >
> >>> > > > > I'm asking because I've reviewed almost all Beam IOs and don't
> >>> recall
> >>> > > > > seeing a similar problem. Sure, a lot of IOs do batching
> within a
> >>> > > bundle,
> >>> > > > > but 1) it doesn't take up much code (granted, it would be even
> >>> easier
> >>> > > if
> >>> > > > > Beam did it for us) and 2) I don't remember any of them
> requiring
> >>> the
> >>> > > > > batches to be deterministic, and I'm having a hard time
> imagining
> >>> > what
> >>> > > > kind
> >>> > > > > of storage system would be able to deduplicate if batches were
> >>> > > > > deterministic but wouldn't be able to deduplicate if they
> weren't.
> >>> > > > >
> >>> > > > > On Thu, Nov 16, 2017 at 11:50 PM Romain Manni-Bucau <
> >>> > > > rmannibu...@gmail.com>
> >>> > > > > wrote:
> >>> > > > >
> >>> > > > >> Ok, let me try to step back and summarize what we have today
> and
> >>> > what
> >>> > > I
> >>> > > > >> miss:
> >>> > > > >>
> >>> > > > >> 1. we can handle chunking in beam through group in batch (or
> >>> > > equivalent)
> >>> > > > >> but:
> >>> > > > >>    > it is not built-in into the transforms (IO) and it is
> >>> > controlled
> >>> > > > >> from outside the transforms so no way for a transform to do it
> >>> > > > >> properly without handling itself a composition and links
> between
> >>> > > > >> multiple dofns to have notifications and potentially react
> >>> properly
> >>> > or
> >>> > > > >> handle backpressure from its backend
> >>> > > > >> 2. there is no restart feature because there is no real state
> >>> > handling
> >>> > > > >> at the moment. this sounds fully delegated to the runner but
> I was
> >>> > > > >> hoping to have more guarantees from the used API to be able to
> >>> > restart
> >>> > > > >> a pipeline (mainly batch since it can be irrelevant or
> delegates
> >>> to
> >>> > > > >> the backend for streams) and handle only not commited records
> so
> >>> it
> >>> > > > >> requires some persistence outside the main IO storages to do
> it
> >>> > > > >> properly
> >>> > > > >>    > note this is somehow similar to the monitoring topic
> which
> >>> miss
> >>> > > > >> persistence ATM so it can end up to beam to have a pluggable
> >>> storage
> >>> > > > >> for a few concerns
> >>> > > > >>
> >>> > > > >>
> >>> > > > >> Short term I would be happy with 1 solved properly, long term
> I
> >>> hope
> >>> > 2
> >>> > > > >> will be tackled without workarounds requiring custom wrapping
> of
> >>> IO
> >>> > to
> >>> > > > >> use a custom state persistence.
> >>> > > > >>
> >>> > > > >>
> >>> > > > >>
> >>> > > > >> Romain Manni-Bucau
> >>> > > > >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >>> > > > >>
> >>> > > > >>
> >>> > > > >> 2017-11-17 7:44 GMT+01:00 Jean-Baptiste Onofré <
> j...@nanthrax.net>:
> >>> > > > >> > Thanks for the explanation. Agree, we might talk about
> different
> >>> > > > things
> >>> > > > >> > using the same wording.
> >>> > > > >> >
> >>> > > > >> > I'm also struggling to understand the use case (for a
> generic
> >>> > DoFn).
> >>> > > > >> >
> >>> > > > >> > Regards
> >>> > > > >> > JB
> >>> > > > >> >
> >>> > > > >> >
> >>> > > > >> > On 11/17/2017 07:40 AM, Eugene Kirpichov wrote:
> >>> > > > >> >>
> >>> > > > >> >> To avoid spending a lot of time pursuing a false path, I'd
> like
> >>> > to
> >>> > > > say
> >>> > > > >> >> straight up that SDF is definitely not going to help here,
> >>> > despite
> >>> > > > the
> >>> > > > >> >> fact
> >>> > > > >> >> that its API includes the term "checkpoint". In SDF, the
> >>> > > "checkpoint"
> >>> > > > >> >> captures the state of processing within a single element.
> If
> >>> > you're
> >>> > > > >> >> applying an SDF to 1000 elements, it will, like any other
> DoFn,
> >>> > be
> >>> > > > >> applied
> >>> > > > >> >> to each of them independently and in parallel, and you'll
> have
> >>> > 1000
> >>> > > > >> >> checkpoints capturing the state of processing each of these
> >>> > > elements,
> >>> > > > >> >> which
> >>> > > > >> >> is probably not what you want.
> >>> > > > >> >>
> >>> > > > >> >> I'm afraid I still don't understand what kind of
> checkpoint you
> >>> > > > need, if
> >>> > > > >> >> it
> >>> > > > >> >> is not just deterministic grouping into batches.
> "Checkpoint"
> >>> is
> >>> > a
> >>> > > > very
> >>> > > > >> >> broad term and it's very possible that everybody in this
> thread
> >>> > is
> >>> > > > >> talking
> >>> > > > >> >> about different things when saying it. So it would help if
> you
> >>> > > could
> >>> > > > >> give
> >>> > > > >> >> a
> >>> > > > >> >> more concrete example: for example, take some IO that you
> think
> >>> > > > could be
> >>> > > > >> >> easier to write with your proposed API, give the contents
> of a
> >>> > > > >> >> hypothetical
> >>> > > > >> >> PCollection being written to this IO, give the code of a
> >>> > > hypothetical
> >>> > > > >> DoFn
> >>> > > > >> >> implementing the write using your API, and explain what
> you'd
> >>> > > expect
> >>> > > > to
> >>> > > > >> >> happen at runtime.
> >>> > > > >> >>
> >>> > > > >> >> On Thu, Nov 16, 2017 at 10:33 PM Romain Manni-Bucau
> >>> > > > >> >> <rmannibu...@gmail.com>
> >>> > > > >> >> wrote:
> >>> > > > >> >>
> >>> > > > >> >>> @Eugene: yes and the other alternative of Reuven too but
> it is
> >>> > > still
> >>> > > > >> >>> 1. relying on timers, 2. not really checkpointed
> >>> > > > >> >>>
> >>> > > > >> >>> In other words it seems all solutions are to create a
> chunk of
> >>> > > size
> >>> > > > 1
> >>> > > > >> >>> and replayable to fake the lack of chunking in the
> framework.
> >>> > This
> >>> > > > >> >>> always implies a chunk handling outside the component
> >>> (typically
> >>> > > > >> >>> before for an output). My point is I think IO need it in
> their
> >>> > own
> >>> > > > >> >>> "internal" or at least control it themselves since the
> chunk
> >>> > size
> >>> > > is
> >>> > > > >> >>> part of the IO handling most of the time.
> >>> > > > >> >>>
> >>> > > > >> >>> I think JB spoke of the same "group before" trick using
> >>> > > restrictions
> >>> > > > >> >>> which can work I have to admit if SDF are implemented by
> >>> > runners.
> >>> > > Is
> >>> > > > >> >>> there a roadmap/status on that? Last time I checked SDF
> was a
> >>> > > great
> >>> > > > >> >>> API without support :(.
> >>> > > > >> >>>
> >>> > > > >> >>>
> >>> > > > >> >>>
> >>> > > > >> >>> Romain Manni-Bucau
> >>> > > > >> >>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >>> > > > >> >>>
> >>> > > > >> >>>
> >>> > > > >> >>> 2017-11-17 7:25 GMT+01:00 Eugene Kirpichov
> >>> > > > >> >>> <kirpic...@google.com.invalid>:
> >>> > > > >> >>>>
> >>> > > > >> >>>> JB, not sure what you mean? SDFs and triggers are
> unrelated,
> >>> > and
> >>> > > > the
> >>> > > > >> >>>> post
> >>> > > > >> >>>> doesn't mention the word. Did you mean something else,
> e.g.
> >>> > > > >> restriction
> >>> > > > >> >>>> perhaps? Either way I don't think SDFs are the solution
> here;
> >>> > > SDFs
> >>> > > > >> have
> >>> > > > >> >>>
> >>> > > > >> >>> to
> >>> > > > >> >>>>
> >>> > > > >> >>>> do with the ability to split the processing of *a single
> >>> > element*
> >>> > > > over
> >>> > > > >> >>>> multiple calls, whereas Romain I think is asking for
> >>> repeatable
> >>> > > > >> grouping
> >>> > > > >> >>>
> >>> > > > >> >>> of
> >>> > > > >> >>>>
> >>> > > > >> >>>> *multiple* elements.
> >>> > > > >> >>>>
> >>> > > > >> >>>> Romain - does
> >>> > > > >> >>>>
> >>> > > > >> >>>
> >>> > > > >> >>>
> >>> > > > >> https://github.com/apache/beam/blob/master/sdks/java/
> >>> > > > core/src/main/java/org/apache/beam/sdk/transforms/
> >>> GroupIntoBatches.java
> >>> > > > >> >>>>
> >>> > > > >> >>>> do what
> >>> > > > >> >>>> you want?
> >>> > > > >> >>>>
> >>> > > > >> >>>> On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste Onofré <
> >>> > > > >> j...@nanthrax.net>
> >>> > > > >> >>>> wrote:
> >>> > > > >> >>>>
> >>> > > > >> >>>>> It sounds like the "Trigger" in the Splittable DoFn, no
> ?
> >>> > > > >> >>>>>
> >>> > > > >> >>>>> https://beam.apache.org/blog/
> 2017/08/16/splittable-do-fn.
> >>> html
> >>> > > > >> >>>>>
> >>> > > > >> >>>>> Regards
> >>> > > > >> >>>>> JB
> >>> > > > >> >>>>>
> >>> > > > >> >>>>>
> >>> > > > >> >>>>> On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote:
> >>> > > > >> >>>>>>
> >>> > > > >> >>>>>> it gives the fn/transform the ability to save a state
> - it
> >>> > can
> >>> > > > get
> >>> > > > >> >>>>>> back on "restart" / whatever unit we can use, probably
> >>> runner
> >>> > > > >> >>>>>> dependent? Without that you need to rewrite all IO
> usage
> >>> with
> >>> > > > >> >>>>>> something like the previous pattern which makes the IO
> not
> >>> > self
> >>> > > > >> >>>>>> sufficient and kind of makes the entry cost and usage
> of
> >>> beam
> >>> > > way
> >>> > > > >> >>>>>> further.
> >>> > > > >> >>>>>>
> >>> > > > >> >>>>>> In my mind it is exactly what jbatch/spring-batch uses
> but
> >>> > > > adapted
> >>> > > > >> to
> >>> > > > >> >>>>>> beam (stream in particular) case.
> >>> > > > >> >>>>>>
> >>> > > > >> >>>>>> Romain Manni-Bucau
> >>> > > > >> >>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >>> > > > >> >>>>>>
> >>> > > > >> >>>>>>
> >>> > > > >> >>>>>> 2017-11-17 6:49 GMT+01:00 Reuven Lax
> >>> > <re...@google.com.invalid
> >>> > > >:
> >>> > > > >> >>>>>>>
> >>> > > > >> >>>>>>> Romain,
> >>> > > > >> >>>>>>>
> >>> > > > >> >>>>>>> Can you define what you mean by checkpoint? What are
> the
> >>> > > > semantics,
> >>> > > > >> >>>
> >>> > > > >> >>> what
> >>> > > > >> >>>>>>>
> >>> > > > >> >>>>>>> does it accomplish?
> >>> > > > >> >>>>>>>
> >>> > > > >> >>>>>>> Reuven
> >>> > > > >> >>>>>>>
> >>> > > > >> >>>>>>> On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau <
> >>> > > > >> >>>>>
> >>> > > > >> >>>>> rmannibu...@gmail.com>
> >>> > > > >> >>>>>>>
> >>> > > > >> >>>>>>> wrote:
> >>> > > > >> >>>>>>>
> >>> > > > >> >>>>>>>> Yes, what I propose earlier was:
> >>> > > > >> >>>>>>>>
> >>> > > > >> >>>>>>>> I. checkpoint marker:
> >>> > > > >> >>>>>>>>
> >>> > > > >> >>>>>>>> @AnyBeamAnnotation
> >>> > > > >> >>>>>>>> @CheckpointAfter
> >>> > > > >> >>>>>>>> public void someHook(SomeContext ctx);
> >>> > > > >> >>>>>>>>
> >>> > > > >> >>>>>>>>
> >>> > > > >> >>>>>>>> II. pipeline.apply(ParDo.of(new
> >>> > > > >> MyFn()).withCheckpointAlgorithm(new
> >>> > > > >> >>>>>>>> CountingAlgo()))
> >>> > > > >> >>>>>>>>
> >>> > > > >> >>>>>>>> III. (I like this one less)
> >>> > > > >> >>>>>>>>
> >>> > > > >> >>>>>>>> // in the dofn
> >>> > > > >> >>>>>>>> @CheckpointTester
> >>> > > > >> >>>>>>>> public boolean shouldCheckpoint();
> >>> > > > >> >>>>>>>>
> >>> > > > >> >>>>>>>> IV. @Checkpointer Serializable getCheckpoint(); in
> the
> >>> dofn
> >>> > > per
> >>> > > > >> >>>
> >>> > > > >> >>> element
> >>> > > > >> >>>>>>>>
> >>> > > > >> >>>>>>>>
> >>> > > > >> >>>>>>>>
> >>> > > > >> >>>>>>>>
> >>> > > > >> >>>>>>>>
> >>> > > > >> >>>>>>>> Romain Manni-Bucau
> >>> > > > >> >>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >>> > > > >> >>>>>>>>
> >>> > > > >> >>>>>>>>
> >>> > > > >> >>>>>>>> 2017-11-17 6:06 GMT+01:00 Raghu Angadi
> >>> > > > <rang...@google.com.invalid
> >>> > > > >> >>>>
> >>> > > > >> >>>> :
> >>> > > > >> >>>>>>>>>
> >>> > > > >> >>>>>>>>> How would you define it (rough API is fine)?.
> Without
> >>> more
> >>> > > > >> details,
> >>> > > > >> >>>>>
> >>> > > > >> >>>>> it is
> >>> > > > >> >>>>>>>>>
> >>> > > > >> >>>>>>>>> not easy to see wider applicability and feasibility
> in
> >>> > > > runners.
> >>> > > > >> >>>>>>>>>
> >>> > > > >> >>>>>>>>> On Thu, Nov 16, 2017 at 1:13 PM, Romain Manni-Bucau
> <
> >>> > > > >> >>>>>>>>
> >>> > > > >> >>>>>>>> rmannibu...@gmail.com>
> >>> > > > >> >>>>>>>>>
> >>> > > > >> >>>>>>>>> wrote:
> >>> > > > >> >>>>>>>>>
> >>> > > > >> >>>>>>>>>> This is a fair summary of the current state but
> also
> >>> > where
> >>> > > > beam
> >>> > > > >> >>>
> >>> > > > >> >>> can
> >>> > > > >> >>>>>>>>
> >>> > > > >> >>>>>>>> have a
> >>> > > > >> >>>>>>>>>>
> >>> > > > >> >>>>>>>>>> very strong added value and make big data great and
> >>> > smooth.
> >>> > > > >> >>>>>>>>>>
> >>> > > > >> >>>>>>>>>> Instead of this replay feature isnt checkpointing
> >>> > willable?
> >>> > > > In
> >>> > > > >> >>>>>>>>
> >>> > > > >> >>>>>>>> particular
> >>> > > > >> >>>>>>>>>>
> >>> > > > >> >>>>>>>>>> with SDF no?
> >>> > > > >> >>>>>>>>>>
> >>> > > > >> >>>>>>>>>>
> >>> > > > >> >>>>>>>>>> Le 16 nov. 2017 19:50, "Raghu Angadi"
> >>> > > > >> <rang...@google.com.invalid>
> >>> > > > >> >>>
> >>> > > > >> >>> a
> >>> > > > >> >>>>>>>>>>
> >>> > > > >> >>>>>>>>>> écrit :
> >>> > > > >> >>>>>>>>>>
> >>> > > > >> >>>>>>>>>>> Core issue here is that there is no explicit
> concept
> >>> of
> >>> > > > >> >>>
> >>> > > > >> >>> 'checkpoint'
> >>> > > > >> >>>>>>>>
> >>> > > > >> >>>>>>>> in
> >>> > > > >> >>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>> Beam (UnboundedSource has a method
> 'getCheckpointMark'
> >>> > but
> >>> > > > that
> >>> > > > >> >>>>>>>>
> >>> > > > >> >>>>>>>> refers to
> >>> > > > >> >>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>> the checkoint on external source). Runners do
> >>> checkpoint
> >>> > > > >> >>>
> >>> > > > >> >>> internally
> >>> > > > >> >>>>>
> >>> > > > >> >>>>> as
> >>> > > > >> >>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>> implementation detail. Flink's checkpoint model is
> >>> > > entirely
> >>> > > > >> >>>>>
> >>> > > > >> >>>>> different
> >>> > > > >> >>>>>>>>>>
> >>> > > > >> >>>>>>>>>> from
> >>> > > > >> >>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>> Dataflow's and Spark's.
> >>> > > > >> >>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>> @StableReplay helps, but it does not explicitly
> talk
> >>> > about
> >>> > > a
> >>> > > > >> >>>>>>>>
> >>> > > > >> >>>>>>>> checkpoint
> >>> > > > >> >>>>>>>>>>
> >>> > > > >> >>>>>>>>>> by
> >>> > > > >> >>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>> design.
> >>> > > > >> >>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>> If you are looking to achieve some guarantees
> with a
> >>> > > > >> sink/DoFn, I
> >>> > > > >> >>>>>>>>
> >>> > > > >> >>>>>>>> think
> >>> > > > >> >>>>>>>>>>
> >>> > > > >> >>>>>>>>>> it
> >>> > > > >> >>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>> is better to start with the requirements. I
> worked on
> >>> > > > >> >>>
> >>> > > > >> >>> exactly-once
> >>> > > > >> >>>>>>>>
> >>> > > > >> >>>>>>>> sink
> >>> > > > >> >>>>>>>>>>
> >>> > > > >> >>>>>>>>>> for
> >>> > > > >> >>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>> Kafka (see KafkaIO.write().withEOS()), where we
> >>> > > essentially
> >>> > > > >> >>>
> >>> > > > >> >>> reshard
> >>> > > > >> >>>>>>>>
> >>> > > > >> >>>>>>>> the
> >>> > > > >> >>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>> elements and assign sequence numbers to elements
> with
> >>> in
> >>> > > > each
> >>> > > > >> >>>
> >>> > > > >> >>> shard.
> >>> > > > >> >>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>> Duplicates in replays are avoided based on these
> >>> > sequence
> >>> > > > >> >>>
> >>> > > > >> >>> numbers.
> >>> > > > >> >>>>>>>>
> >>> > > > >> >>>>>>>> DoFn
> >>> > > > >> >>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>> state API is used to buffer out-of order replays.
> The
> >>> > > > >> >>>
> >>> > > > >> >>> implementation
> >>> > > > >> >>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>> strategy works in Dataflow but not in Flink which
> has
> >>> a
> >>> > > > >> >>>
> >>> > > > >> >>> horizontal
> >>> > > > >> >>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>> checkpoint. KafkaIO checks for compatibility.
> >>> > > > >> >>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>> On Wed, Nov 15, 2017 at 12:38 AM, Romain
> Manni-Bucau <
> >>> > > > >> >>>>>>>>>>> rmannibu...@gmail.com>
> >>> > > > >> >>>>>>>>>>> wrote:
> >>> > > > >> >>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>>> Hi guys,
> >>> > > > >> >>>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>>> The subject is a bit provocative but the topic is
> >>> real
> >>> > > and
> >>> > > > >> >>>
> >>> > > > >> >>> coming
> >>> > > > >> >>>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>>> again and again with the beam usage: how a dofn
> can
> >>> > > handle
> >>> > > > >> some
> >>> > > > >> >>>>>>>>>>>> "chunking".
> >>> > > > >> >>>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>>> The need is to be able to commit each N records
> but
> >>> > with
> >>> > > N
> >>> > > > not
> >>> > > > >> >>>
> >>> > > > >> >>> too
> >>> > > > >> >>>>>>>>
> >>> > > > >> >>>>>>>> big.
> >>> > > > >> >>>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>>> The natural API for that in beam is the bundle
> one
> >>> but
> >>> > > > bundles
> >>> > > > >> >>>
> >>> > > > >> >>> are
> >>> > > > >> >>>>>>>>
> >>> > > > >> >>>>>>>> not
> >>> > > > >> >>>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>>> reliable since they can be very small (flink) -
> we
> >>> can
> >>> > > say
> >>> > > > it
> >>> > > > >> is
> >>> > > > >> >>>>>>>>
> >>> > > > >> >>>>>>>> "ok"
> >>> > > > >> >>>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>>> even if it has some perf impacts - or too big
> (spark
> >>> > does
> >>> > > > full
> >>> > > > >> >>>
> >>> > > > >> >>> size
> >>> > > > >> >>>>>>>>
> >>> > > > >> >>>>>>>> /
> >>> > > > >> >>>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>>> #workers).
> >>> > > > >> >>>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>>> The workaround is what we see in the ES I/O: a
> >>> maxSize
> >>> > > > which
> >>> > > > >> >>>
> >>> > > > >> >>> does
> >>> > > > >> >>>>>
> >>> > > > >> >>>>> an
> >>> > > > >> >>>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>>> eager flush. The issue is that then the
> checkpoint is
> >>> > not
> >>> > > > >> >>>
> >>> > > > >> >>> respected
> >>> > > > >> >>>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>>> and you can process multiple times the same
> records.
> >>> > > > >> >>>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>>> Any plan to make this API reliable and
> controllable
> >>> > from
> >>> > > a
> >>> > > > >> beam
> >>> > > > >> >>>>>>>>
> >>> > > > >> >>>>>>>> point
> >>> > > > >> >>>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>>> of view (at least in a max manner)?
> >>> > > > >> >>>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>>> Thanks,
> >>> > > > >> >>>>>>>>>>>> Romain Manni-Bucau
> >>> > > > >> >>>>>>>>>>>> @rmannibucau |  Blog | Old Blog | Github |
> LinkedIn
> >>> > > > >> >>>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>>
> >>> > > > >> >>>>>>>>>>
> >>> > > > >> >>>>>>>>
> >>> > > > >> >>>>>
> >>> > > > >> >>>>> --
> >>> > > > >> >>>>> Jean-Baptiste Onofré
> >>> > > > >> >>>>> jbono...@apache.org
> >>> > > > >> >>>>> http://blog.nanthrax.net
> >>> > > > >> >>>>> Talend - http://www.talend.com
> >>> > > > >> >>>>>
> >>> > > > >> >>>
> >>> > > > >> >>
> >>> > > > >> >
> >>> > > > >> > --
> >>> > > > >> > Jean-Baptiste Onofré
> >>> > > > >> > jbono...@apache.org
> >>> > > > >> > http://blog.nanthrax.net
> >>> > > > >> > Talend - http://www.talend.com
> >>> > > > >>
> >>> > > >
> >>> > >
> >>> >
> >>>
>

Reply via email to